Fetch the repository succeeded.
/*
Copyright 2021 SANGFOR TECHNOLOGIES
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package orcraft
import (
"encoding/json"
"fmt"
"gitee.com/opengauss/ham4db/go/dtstruct"
"net"
"os"
"strings"
"time"
"gitee.com/opengauss/ham4db/go/core/log"
slog "log"
"github.com/hashicorp/raft"
)
type Store struct {
raftDir string
raftBind string
raftAdvertise string
raft *raft.Raft // The consensus mechanism
peerStore raft.PeerStore
applier CommandApplier
snapshotCreatorApplier dtstruct.SnapshotHandler
}
type storeCommand struct {
Op string `json:"op,omitempty"`
Value []byte `json:"value,omitempty"`
}
// NewStore inits and returns a new store
func NewStore(raftDir string, raftBind string, raftAdvertise string, applier CommandApplier, snapshotCreatorApplier dtstruct.SnapshotHandler) *Store {
return &Store{
raftDir: raftDir,
raftBind: raftBind,
raftAdvertise: raftAdvertise,
applier: applier,
snapshotCreatorApplier: snapshotCreatorApplier,
}
}
// Open opens the store. If enableSingle is set, and there are no existing peers,
// then this node becomes the first node, and therefore leader, of the cluster.
func (store *Store) Open(peerNodes []string) error {
// Setup Raft configuration.
config := raft.DefaultConfig()
config.SnapshotThreshold = 1
config.SnapshotInterval = snapshotInterval
config.ShutdownOnRemove = false
config.Logger = slog.New(&log.RLog{}, "", log.Lmicroseconds)
// Setup Raft communication.
advertise, err := net.ResolveTCPAddr("tcp", store.raftAdvertise)
if err != nil {
return err
}
log.Debugf("raft: advertise=%+v", advertise)
transport, err := raft.NewTCPTransport(store.raftBind, advertise, 3, 10*time.Second, os.Stderr)
if err != nil {
return err
}
log.Debugf("raft: transport=%+v", transport)
peers := make([]string, 0, 10)
for _, peerNode := range peerNodes {
peerNode = strings.TrimSpace(peerNode)
peers = raft.AddUniquePeer(peers, peerNode)
}
log.Debugf("raft: peers=%+v", peers)
// Create peer storage.
peerStore := &raft.StaticPeers{}
if err := peerStore.SetPeers(peers); err != nil {
return err
}
// Allow the node to enter single-mode, potentially electing itself, if
// explicitly enabled and there is only 1 node in the cluster already.
if len(peerNodes) == 0 && len(peers) <= 1 {
log.Infof("enabling single-node mode")
config.EnableSingleNode = true
config.DisableBootstrapAfterElect = false
}
if _, err := os.Stat(store.raftDir); err != nil {
if os.IsNotExist(err) {
// path does not exist
log.Debugf("raft: creating data dir %s", store.raftDir)
if err := os.MkdirAll(store.raftDir, os.ModePerm); err != nil {
return log.Errorf("RaftDataDir (%s) does not exist and cannot be created: %+v", store.raftDir, err)
}
} else {
// Other error
return log.Errorf("RaftDataDir (%s) error: %+v", store.raftDir, err)
}
}
// Create the snapshot store. This allows the Raft to truncate the log.
snapshots, err := NewFileSnapshotStore(store.raftDir, retainSnapshotCount, os.Stderr)
if err != nil {
return log.Errorf("file snapshot store: %s", err)
}
// Create the log store and stable store.
logStore := NewRelationalStore(store.raftDir)
log.Debugf("raft: logStore=%+v", logStore)
// Instantiate the Raft systems.
if store.raft, err = raft.NewRaft(config, (*fsm)(store), logStore, logStore, snapshots, peerStore, transport); err != nil {
return fmt.Errorf("error creating new raft: %s", err)
}
store.peerStore = peerStore
log.Infof("new raft created")
return nil
}
// AddPeer adds a node, located at addr, to this store. The node must be ready to
// respond to Raft communications at that address.
func (store *Store) AddPeer(addr string) error {
log.Infof("received join request for remote node %s", addr)
f := store.raft.AddPeer(addr)
if f.Error() != nil {
return f.Error()
}
log.Infof("node at %s joined successfully", addr)
return nil
}
// RemovePeer removes a node from this raft setup
func (store *Store) RemovePeer(addr string) error {
log.Infof("received remove request for remote node %s", addr)
f := store.raft.RemovePeer(addr)
if f.Error() != nil {
return f.Error()
}
log.Infof("node at %s removed successfully", addr)
return nil
}
// genericCommand requests consensus for applying a single command.
// This is an internal ham4db implementation
func (store *Store) genericCommand(op string, bytes []byte) (response interface{}, err error) {
if store.raft.State() != raft.Leader {
return nil, fmt.Errorf("not leader")
}
b, err := json.Marshal(&storeCommand{Op: op, Value: bytes})
if err != nil {
return nil, err
}
f := store.raft.Apply(b, raftTimeout)
if err = f.Error(); err != nil {
return nil, err
}
r := f.Response()
if err, ok := r.(error); ok && err != nil {
// This code checks whether the response itself was an error object. If so, it should
// indicate failure of the operation.
return r, err
}
return r, nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。