2 Star 2 Fork 1

cockroachdb/cockroach

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
stream_merger.go 4.52 KB
一键复制 编辑 原始数据 按行查看 历史
Tobias Schottdorf 提交于 2017-07-31 17:15 . *: remove // Author: comments
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package distsqlrun
import (
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/pkg/errors"
)
// We define a group to be a set of rows from a given source with the same
// group key, in this case the set of ordered columns. streamMerger emits
// batches of rows that are the cross-product of matching groups from each
// stream.
type streamMerger struct {
left streamGroupAccumulator
right streamGroupAccumulator
datumAlloc sqlbase.DatumAlloc
}
// NextBatch returns a set of rows from the left stream and a set of rows from
// the right stream, all matching on the equality columns. One of the sets can
// be empty.
func (sm *streamMerger) NextBatch() ([]sqlbase.EncDatumRow, []sqlbase.EncDatumRow, error) {
lrow, err := sm.left.peekAtCurrentGroup()
if err != nil {
return nil, nil, err
}
rrow, err := sm.right.peekAtCurrentGroup()
if err != nil {
return nil, nil, err
}
if lrow == nil && rrow == nil {
return nil, nil, nil
}
cmp, err := CompareEncDatumRowForMerge(lrow, rrow, sm.left.ordering, sm.right.ordering, &sm.datumAlloc)
if err != nil {
return nil, nil, err
}
var leftGroup, rightGroup []sqlbase.EncDatumRow
if cmp <= 0 {
leftGroup, err = sm.left.advanceGroup()
if err != nil {
return nil, nil, err
}
}
if cmp >= 0 {
rightGroup, err = sm.right.advanceGroup()
if err != nil {
return nil, nil, err
}
}
return leftGroup, rightGroup, nil
}
// CompareEncDatumRowForMerge EncDatumRow compares two EncDatumRows for merging.
// When merging two streams and preserving the order (as in a MergeSort or
// a MergeJoin) compare the head of the streams, emitting the one that sorts
// first. It allows for the EncDatumRow to be nil if one of the streams is
// exhausted (and hence nil). CompareEncDatumRowForMerge returns 0 when both
// rows are nil, and a nil row is considered greater than any non-nil row.
// CompareEncDatumRowForMerge assumes that the two rows have the same columns
// in the same orders, but can handle different ordering directions. It takes
// a DatumAlloc which is used for decoding if any underlying EncDatum is not
// yet decoded.
func CompareEncDatumRowForMerge(
lhs, rhs sqlbase.EncDatumRow,
leftOrdering, rightOrdering sqlbase.ColumnOrdering,
da *sqlbase.DatumAlloc,
) (int, error) {
if lhs == nil && rhs == nil {
return 0, nil
}
if lhs == nil {
return 1, nil
}
if rhs == nil {
return -1, nil
}
if len(leftOrdering) != len(rightOrdering) {
return 0, errors.Errorf(
"cannot compare two EncDatumRow types that have different length ColumnOrderings",
)
}
// TODO(radu): plumb EvalContext
evalCtx := &parser.EvalContext{}
for i, ord := range leftOrdering {
lIdx := ord.ColIdx
rIdx := rightOrdering[i].ColIdx
cmp, err := lhs[lIdx].Compare(da, evalCtx, &rhs[rIdx])
if err != nil {
return 0, err
}
if cmp != 0 {
if leftOrdering[i].Direction == encoding.Descending {
cmp = -cmp
}
return cmp, nil
}
}
return 0, nil
}
// makeStreamMerger creates a streamMerger, joining rows from leftSource with
// rows from rightSource.
//
// All metadata from the sources is forwarded to metadataSink.
func makeStreamMerger(
leftSource RowSource,
leftOrdering sqlbase.ColumnOrdering,
rightSource RowSource,
rightOrdering sqlbase.ColumnOrdering,
metadataSink RowReceiver,
) (streamMerger, error) {
if len(leftOrdering) != len(rightOrdering) {
return streamMerger{}, errors.Errorf(
"ordering lengths don't match: %d and %d", len(leftOrdering), len(rightOrdering))
}
for i, ord := range leftOrdering {
if ord.Direction != rightOrdering[i].Direction {
return streamMerger{}, errors.New("Ordering mismatch")
}
}
return streamMerger{
left: makeStreamGroupAccumulator(
MakeNoMetadataRowSource(leftSource, metadataSink),
leftOrdering),
right: makeStreamGroupAccumulator(
MakeNoMetadataRowSource(rightSource, metadataSink),
rightOrdering),
}, nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_cockroachdb/cockroach.git
git@gitee.com:mirrors_cockroachdb/cockroach.git
mirrors_cockroachdb
cockroach
cockroach
v1.1.8

搜索帮助