It provides a easy way to create a flow process and significantly improve the efficiency of data processing.
For example, we count the file words and get the top 10 occurrences of the words. The test file is too small, you can enlarge the file by copying it several times. Let's compare the two ways below:
wordCount := map[string]int{}
reverse := true
//You can replace the file with a larger file.
file := "testfile/2553.txt"
start := time.Now()
f, err := os.Open(file)
if err != nil {
panic(err)
}
defer f.Close()
sc := bufio.NewScanner(f)
//split lines
for sc.Scan() {
line := sc.Text()
sps := splitText(line)
for i := 0; i < len(sps); i++ {
st := strings.TrimSpace(sps[i])
if len(st) > 0 {
wordCount[st]++
}
}
}
//sort by word occurrence times desc
sortedWc := sortWc(wordCount, reverse)
duration := time.Since(start)
//print elapsed time
fmt.Printf("duration(ms):%v\n", duration.Milliseconds())
//print topN
topN := 10
if topN > len(sortedWc) {
topN = len(sortedWc)
}
fmt.Println("sortedWc-top", topN, ":")
for i := 0; i < topN; i++ {
fmt.Println(sortedWc[i])
}
The 'General way' is slow and has lower CPU and IO usage when the file is very large.
We separate IO and CPU operations.
//ReadFileProcessor reads file lines, and put the line into a OutTaskChan for next flow-node to process.
type ReadFileProcessor struct {
Filepath string
}
func (g *ReadFileProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool) {
f, err := os.Open(g.Filepath)
if err != nil {
panic(err)
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
select {
case <- ctx.Done() :
return
default:
line := sc.Text()
outTask <- line
}
}
return
}
//SplitAndCountProcessor splits the line and counts the word occurrence.
type SplitAndCountProcessor struct {
}
func (s *SplitAndCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool) {
wordCount := map[string]int{}
for {
select {
case <-ctx.Done():
return true
case task, ok := <-inTasks:
if ok {
line := task.(string)
sps := splitText(line)
for i := 0; i < len(sps); i++ {
st := strings.TrimSpace(sps[i])
if len(st) > 0 {
wordCount[st]++
}
}
} else {
outTask <- wordCount
return
}
}
}
}
//SumWordCountProcessor summarizes the word occurrence.
type SumWordCountProcessor struct {
reverse bool
}
func (s *SumWordCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool) {
wordCount := map[string]int{}
for {
select {
case <-ctx.Done():
return true
case task, ok := <-inTasks:
if ok {
wc := task.(map[string]int)
for key, val := range wc {
wordCount[key] += val
}
} else {
sortedWc := sortWc(wordCount, s.reverse)
outTask <- sortedWc
return
}
}
}
}
start := time.Now()
fp := flowprocess.NewFlow()
queneCount := 4000
//Node-0: read file lines. We define 1 processor to read file.
fp.AddNodeProcessors(queneCount,
&ReadFileProcessor{
//You can replace the file with a larger file.
Filepath: "testfile/2553.txt",
})
//Node-1: split and count. we define 4 parallel processors to split and count.
fp.AddNodeProcessors(queneCount,
&SplitAndCountProcessor{},
&SplitAndCountProcessor{},
&SplitAndCountProcessor{},
&SplitAndCountProcessor{},
)
result := &SumWordCountProcessor{
reverse: true,
}
//Node-2: we define 1 processor to summarize.
fp.AddNodeProcessors(1,
result,
)
fp.Start()
if res, ok := fp.Result(); ok {
sortedWc := res.([]wordAndCount)
duration := time.Since(start)
fmt.Printf("duration(ms):%v\n", duration.Milliseconds())
topN := 10
if topN > len(sortedWc) {
topN = len(sortedWc)
}
fmt.Println("sortedWc-top", topN, ":")
for i := 0; i < topN; i++ {
fmt.Println(sortedWc[i])
}
}
The 'Flow and Parallel way' is faster and has higher CPU and IO usage when the file is very large.
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
Activity
Community
Health
Trend
Influence
:Code submit frequency
:React/respond to issue & PR etc.
:Well-balanced team members and collaboration
:Recent popularity of project
:Star counts, download counts etc.