1 Star 0 Fork 0

流小强 / clientv3

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
snapshotter.go 6.56 KB
一键复制 编辑 原始数据 按行查看 历史
流小强 提交于 2020-03-22 17:56 . etcdv3
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package snap
import (
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"time"
"go.etcd.io/etcd/etcdserver/api/snap/snappb"
pioutil "go.etcd.io/etcd/pkg/ioutil"
"go.etcd.io/etcd/pkg/pbutil"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)
const snapSuffix = ".snap"
var (
ErrNoSnapshot = errors.New("snap: no available snapshot")
ErrEmptySnapshot = errors.New("snap: empty snapshot")
ErrCRCMismatch = errors.New("snap: crc mismatch")
crcTable = crc32.MakeTable(crc32.Castagnoli)
// A map of valid files that can be present in the snap folder.
validFiles = map[string]bool{
"db": true,
}
)
type Snapshotter struct {
lg *zap.Logger
dir string
}
func New(lg *zap.Logger, dir string) *Snapshotter {
if lg == nil {
lg = zap.NewNop()
}
return &Snapshotter{
lg: lg,
dir: dir,
}
}
func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) error {
if raft.IsEmptySnap(snapshot) {
return nil
}
return s.save(&snapshot)
}
func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
start := time.Now()
fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)
b := pbutil.MustMarshal(snapshot)
crc := crc32.Update(0, crcTable, b)
snap := snappb.Snapshot{Crc: crc, Data: b}
d, err := snap.Marshal()
if err != nil {
return err
}
snapMarshallingSec.Observe(time.Since(start).Seconds())
spath := filepath.Join(s.dir, fname)
fsyncStart := time.Now()
err = pioutil.WriteAndSyncFile(spath, d, 0666)
snapFsyncSec.Observe(time.Since(fsyncStart).Seconds())
if err != nil {
s.lg.Warn("failed to write a snap file", zap.String("path", spath), zap.Error(err))
rerr := os.Remove(spath)
if rerr != nil {
s.lg.Warn("failed to remove a broken snap file", zap.String("path", spath), zap.Error(err))
}
return err
}
snapSaveSec.Observe(time.Since(start).Seconds())
return nil
}
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
names, err := s.snapNames()
if err != nil {
return nil, err
}
var snap *raftpb.Snapshot
for _, name := range names {
if snap, err = loadSnap(s.lg, s.dir, name); err == nil {
break
}
}
if err != nil {
return nil, ErrNoSnapshot
}
return snap, nil
}
func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
fpath := filepath.Join(dir, name)
snap, err := Read(lg, fpath)
if err != nil {
brokenPath := fpath + ".broken"
if lg != nil {
lg.Warn("failed to read a snap file", zap.String("path", fpath), zap.Error(err))
}
if rerr := os.Rename(fpath, brokenPath); rerr != nil {
if lg != nil {
lg.Warn("failed to rename a broken snap file", zap.String("path", fpath), zap.String("broken-path", brokenPath), zap.Error(rerr))
}
} else {
if lg != nil {
lg.Warn("renamed to a broken snap file", zap.String("path", fpath), zap.String("broken-path", brokenPath))
}
}
}
return snap, err
}
// Read reads the snapshot named by snapname and returns the snapshot.
func Read(lg *zap.Logger, snapname string) (*raftpb.Snapshot, error) {
b, err := ioutil.ReadFile(snapname)
if err != nil {
if lg != nil {
lg.Warn("failed to read a snap file", zap.String("path", snapname), zap.Error(err))
}
return nil, err
}
if len(b) == 0 {
if lg != nil {
lg.Warn("failed to read empty snapshot file", zap.String("path", snapname))
}
return nil, ErrEmptySnapshot
}
var serializedSnap snappb.Snapshot
if err = serializedSnap.Unmarshal(b); err != nil {
if lg != nil {
lg.Warn("failed to unmarshal snappb.Snapshot", zap.String("path", snapname), zap.Error(err))
}
return nil, err
}
if len(serializedSnap.Data) == 0 || serializedSnap.Crc == 0 {
if lg != nil {
lg.Warn("failed to read empty snapshot data", zap.String("path", snapname))
}
return nil, ErrEmptySnapshot
}
crc := crc32.Update(0, crcTable, serializedSnap.Data)
if crc != serializedSnap.Crc {
if lg != nil {
lg.Warn("snap file is corrupt",
zap.String("path", snapname),
zap.Uint32("prev-crc", serializedSnap.Crc),
zap.Uint32("new-crc", crc),
)
}
return nil, ErrCRCMismatch
}
var snap raftpb.Snapshot
if err = snap.Unmarshal(serializedSnap.Data); err != nil {
if lg != nil {
lg.Warn("failed to unmarshal raftpb.Snapshot", zap.String("path", snapname), zap.Error(err))
}
return nil, err
}
return &snap, nil
}
// snapNames returns the filename of the snapshots in logical time order (from newest to oldest).
// If there is no available snapshots, an ErrNoSnapshot will be returned.
func (s *Snapshotter) snapNames() ([]string, error) {
dir, err := os.Open(s.dir)
if err != nil {
return nil, err
}
defer dir.Close()
names, err := dir.Readdirnames(-1)
if err != nil {
return nil, err
}
if err = s.cleanupSnapdir(names); err != nil {
return nil, err
}
snaps := checkSuffix(s.lg, names)
if len(snaps) == 0 {
return nil, ErrNoSnapshot
}
sort.Sort(sort.Reverse(sort.StringSlice(snaps)))
return snaps, nil
}
func checkSuffix(lg *zap.Logger, names []string) []string {
snaps := []string{}
for i := range names {
if strings.HasSuffix(names[i], snapSuffix) {
snaps = append(snaps, names[i])
} else {
// If we find a file which is not a snapshot then check if it's
// a vaild file. If not throw out a warning.
if _, ok := validFiles[names[i]]; !ok {
if lg != nil {
lg.Warn("found unexpected non-snap file; skipping", zap.String("path", names[i]))
}
}
}
}
return snaps
}
// cleanupSnapdir removes any files that should not be in the snapshot directory:
// - db.tmp prefixed files that can be orphaned by defragmentation
func (s *Snapshotter) cleanupSnapdir(filenames []string) error {
for _, filename := range filenames {
if strings.HasPrefix(filename, "db.tmp") {
s.lg.Info("found orphaned defragmentation file; deleting", zap.String("path", filename))
if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr)
}
}
}
return nil
}
1
https://gitee.com/liuzhiqiang9696/clientv3.git
git@gitee.com:liuzhiqiang9696/clientv3.git
liuzhiqiang9696
clientv3
clientv3
518322da16f5

搜索帮助