代码拉取完成,页面将自动刷新
package canal
import (
"fmt"
"time"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go/log"
)
func (c *Canal) startSyncBinlog() error {
pos := mysql.Position{c.master.Name, c.master.Position}
log.Infof("start sync binlog at %v", pos)
s, err := c.syncer.StartSync(pos)
if err != nil {
return fmt.Errorf("start sync replication at %v error %v", pos, err)
}
timeout := time.Second
forceSavePos := false
for {
ev, err := s.GetEventTimeout(timeout)
if err != nil && err != replication.ErrGetEventTimeout {
return err
} else if err == replication.ErrGetEventTimeout {
timeout = 2 * timeout
continue
}
timeout = time.Second
//next binlog pos
pos.Pos = ev.Header.LogPos
forceSavePos = false
switch e := ev.Event.(type) {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
// r.ev <- pos
forceSavePos = true
log.Infof("rotate binlog to %v", pos)
case *replication.RowsEvent:
// we only focus row based event
if err = c.handleRowsEvent(ev); err != nil {
log.Errorf("handle rows event error %v", err)
return err
}
default:
}
c.master.Update(pos.Name, pos.Pos)
c.master.Save(forceSavePos)
}
return nil
}
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
ev := e.Event.(*replication.RowsEvent)
// Caveat: table may be altered at runtime.
schema := string(ev.Table.Schema)
table := string(ev.Table.Table)
t, err := c.GetTable(schema, table)
if err != nil {
return err
}
var action string
switch e.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
action = InsertAction
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
action = DeleteAction
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
action = UpdateAction
default:
return fmt.Errorf("%s not supported now", e.Header.EventType)
}
events := newRowsEvent(t, action, ev.Rows)
return c.travelRowsEventHandler(events)
}
func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error {
if timeout <= 0 {
timeout = 60
}
timer := time.NewTimer(time.Duration(timeout) * time.Second)
for {
select {
case <-timer.C:
return fmt.Errorf("wait position %v err", pos)
default:
curpos := c.master.Pos()
if curpos.Compare(pos) >= 0 {
return nil
} else {
time.Sleep(100 * time.Millisecond)
}
}
}
return nil
}
func (c *Canal) CatchMasterPos(timeout int) error {
rr, err := c.Execute("SHOW MASTER STATUS")
if err != nil {
return err
}
name, _ := rr.GetString(0, 0)
pos, _ := rr.GetInt(0, 1)
return c.WaitUntilPos(mysql.Position{name, uint32(pos)}, timeout)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。