1 Star 0 Fork 0

lin2631 / sc-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
dataloader.go 13.70 KB
一键复制 编辑 原始数据 按行查看 历史
package core
import (
"errors"
"fmt"
"log"
"path"
"gitee.com/lin2631/sc-go/common"
"gitee.com/lin2631/sc-go/db"
)
const (
MinHourToSimulate = 1 // 最小模拟开始的时间
MaxHourToSimulate = 10 // 最大模拟开始的时间
_MTable = "machineEvents" // machineEvents 在 db 中 对应的表的名字
_MKey = "m_events" // 在该表中的key的名字 ,该key的类型为 []MachineEvent
_ITable = "instanceEvents" // instanceEvents 在 db中 对应表的名字
_IKeyPrefix = "i_events_" //在改表中对应key 共有的前缀 ,后面一个数字代表小时,比如
// i_events_10 对应的是类型为[]InstanceEvent 意义为在10小时(10:00:00 - 10:59:59)中的事件
_StateTable = "clusterState"
_StatePrefix = "state_"
_MEventCsv = "/clusterdata2019/machine_events/machine_events.csv"
_InstanceCsvDir = "/clusterdata2019/2019Cluster_Event/"
_Prefile = "/ableUsed2019/init/begin.csv"
_MaxFileUse = 10
)
// DataLoader is preprocessor of data and the provider of data
// it deal the csv data to the binary data and store the data as cache (powered by boltdb)
// it also provide the state and events of the cluster data
type DataLoader struct {
dataDir string
cacheDir string
dbName string
cachedb *db.DB
}
func NewDataLoader(dataDir string, cacheDir string) *DataLoader {
log.Println("DataLoader initializing...")
dl := DataLoader{dataDir: dataDir, cacheDir: cacheDir}
dl.dbName = path.Join(cacheDir, "scgo.db")
var err error
if dl.cachedb, err = db.OpenDB(dl.dbName); err != nil {
panic(err)
}
//if the cache is not existed ,the db should be empty, init it now.
//Otherwise ,the long waitting time can be saved
if dl.cachedb.IsEmpty() {
preprocess(dl.cachedb, dataDir)
}
return &dl
}
func (dl *DataLoader) InitialState(targetHour int) ClusterState {
if targetHour < MinHourToSimulate || targetHour > MaxHourToSimulate {
log.Panicln("wrong target hour params,range [", MinHourToSimulate, ",", MaxHourToSimulate, "]")
}
stateTable, err := dl.cachedb.OpenDBTable(_StateTable)
if err != nil {
panic(err)
}
var snapshot CRClusterStateSnapShot
stateTable.Load(fmt.Sprintf("%s%d", _StatePrefix, targetHour), &snapshot)
return snapshot.Recover()
}
// return the InstanceEvent between startHour to endHour ,for example startHour=8,endHour=10,
// then the function will return the event of 8 and 9 hour( not including 10)
func (dl *DataLoader) GetInstanceEventBetween(startHour, endHour int) (res []InstanceEvent, err error) {
if startHour >= endHour {
return nil, errors.New("starHour is no less than endHour")
}
t, err := dl.cachedb.OpenDBTable(_ITable)
if err != nil {
return nil, errors.New("can not OpenDBTable")
}
for i := startHour; i < endHour; i++ {
key := fmt.Sprintf("%s%d", _IKeyPrefix, i)
var cur_events []InstanceEvent
err = t.Load(key, &cur_events)
if err != nil {
return nil, err
} else {
res = append(res, cur_events...)
}
}
return res, nil
}
func (dl *DataLoader) GetMachineEvent(startHour, endHour int) (res []MachineEvent, err error) {
if startHour >= endHour {
return nil, errors.New("starHour is no less than endHour")
}
meventsTable := getMachineEventMap(dl.cachedb)
for i := startHour; i < endHour; i++ {
if _, ok := meventsTable[i]; ok {
res = append(res, meventsTable[i]...)
}
}
return
}
func (dl *DataLoader) AllMachineEvent() (res []MachineEvent, err error) {
t, err := dl.cachedb.OpenDBTable(_MTable)
if err != nil {
return nil, errors.New("can not OpenDBTable")
}
err = t.Load(_MKey, &res)
return res, err
}
// ====================================================== # below is private =========================================
func preprocess(cacheDb *db.DB, dataDir string) {
machineEvent_to_DB(cacheDb, dataDir)
instanceEvent_to_DB(cacheDb, dataDir)
calClusterState_to_DB(cacheDb, dataDir)
}
////================ # read machine events to db=============================================
func machineEvent_to_DB(cacheDb *db.DB, dataDir string) {
fileName := path.Join(dataDir, _MEventCsv)
machinesDatas, taletop := CsvToList(fileName)
log.Println(taletop)
log.Println(len(machinesDatas))
machineEvents := make([]MachineEvent, len(machinesDatas), len(machinesDatas))
for i, row := range machinesDatas {
machineEvents[i] = string2MachineEvent(row)
}
log.Println(len(machineEvents))
t, err := cacheDb.OpenDBTable(_MTable)
if err != nil {
panic(err)
}
t.Store(_MKey, machineEvents)
}
////================ # read instance events to db==================================
// multi thread to initializing
func instanceEvent_to_DB(cacheDb *db.DB, dataDir string) {
first := 1
last := _MaxFileUse
instanceEvent_to_DB_files(cacheDb, dataDir, first, last)
}
// 使用流水线, 一个协程先从硬盘加载数据转换成数据结构,然后交给另一协程存储至数据库
//
// csv file--> worker1 ---> instanceEvents ---->worker2------> db
// | |
// | |
// | |
// <-------------times == endfile-startfiles+1------------------> --> allfinish signal
func instanceEvent_to_DB_files(cacheDb *db.DB, dataDir string, startfile, endfile int) {
loadedEvents := make(chan []InstanceEvent, 1)
allFinish := make(chan int, 1)
go func() {
for i := startfile; i <= endfile; i++ {
fileName := path.Join(dataDir, _InstanceCsvDir, fmt.Sprintf("%d.csv", i))
datas, _ := CsvToList(fileName)
events := string2InstanceEventList(datas)
loadedEvents <- events
}
}()
go func() {
for i := startfile; i <= endfile; i++ {
events := <-loadedEvents
// it't pretty ugly, because I can not change []InstanceEvent to []IEvent for the splitEventListByHour function
// it's Go...But it can work , which is a good news :-) !
var Ievents []IEvent
for i, _ := range events {
Ievents = append(Ievents, &events[i])
}
for _, divs := range splitEventListByHour(Ievents) {
saveInstanceEvents2DB(cacheDb, divs.hour, events[divs.startIndex:divs.endIndex])
}
fileName := path.Join(dataDir, fmt.Sprintf("/2019Cluster_Event/%d.csv", i))
log.Println("finish ", fileName)
}
allFinish <- 1
}()
<-allFinish
}
func saveInstanceEvents2DB(cacheDb *db.DB, hour int64, e []InstanceEvent) {
t, err := cacheDb.OpenDBTable(_ITable)
if err != nil {
panic(err)
}
key := fmt.Sprintf("%s%d", _IKeyPrefix, hour)
var old []InstanceEvent
err = t.Load(key, &old)
if err != nil {
t.Store(key, e)
} else {
afer := append(old, e...)
t.Store(key, &afer)
}
}
// 将连续的一串数据根据他们所属的小时数分割至几段.
// 返回值意义: e[startIndex,endIndex)的事件属于hour小时
func splitEventListByHour(e []IEvent) (res []struct {
hour int64
startIndex int
endIndex int
}) {
var curHour = e[0].HappenTime() / HOUR
var lastSplit = 0
for i, v := range e {
event := v
if event.HappenTime()/HOUR != curHour {
var div struct {
hour int64
startIndex int
endIndex int
}
div.hour = curHour
div.startIndex = lastSplit
div.endIndex = i
res = append(res, div)
curHour = event.HappenTime() / HOUR
lastSplit = i
}
}
var div struct {
hour int64
startIndex int
endIndex int
}
div.hour = curHour
div.startIndex = lastSplit
div.endIndex = len(e)
res = append(res, div)
if len(res) > 1 {
for _, v := range res {
log.Println(v.hour, e[v.startIndex].HappenTime(), e[v.endIndex-1].HappenTime())
}
}
return
}
////=================# calculate the cluster states to db=================================
func calClusterState_to_DB(cacheDb *db.DB, dataDir string) {
itable, err := cacheDb.OpenDBTable(_ITable)
if err != nil {
panic(err)
}
meventsTable := getMachineEventMap(cacheDb)
// calculate the state when 1 hour
c := InitCRClusterState().(*CRClusterState)
// get the instance involving the prefile events and the i_events_0
var ievents []InstanceEvent
err = itable.Load(fmt.Sprintf("%s%d", _IKeyPrefix, 0), &ievents)
if err != nil {
panic(err)
}
preEvents := prefileEvent(dataDir)
ievents = append(preEvents, ievents...)
// handle all events
//handleOneBatchEvents(c, ievents, meventsTable[0])
handleOneBatchEvents(c, ievents, meventsTable[0])
c.SetTime(1 * HOUR)
snapshot := c.SnapShot()
stateTable, err := cacheDb.OpenDBTable(_StateTable)
if err != nil {
panic(err)
}
// save the cluster state when the time is 1 hour
stateTable.Store(fmt.Sprintf("%s%d", _StatePrefix, 1), snapshot)
for _, m := range c.GetNodes().GetAllocs() {
log.Println(m)
}
for curHour := 1; curHour < MaxHourToSimulate; curHour++ {
//to check if the data needed is existed
var ievents []InstanceEvent
err := itable.Load(fmt.Sprintf("%s%d", _IKeyPrefix, curHour), &ievents)
if err != nil {
panic(err)
}
targetHour := curHour + 1
log.Println("start initializing: ", targetHour)
handleOneBatchEvents(c, ievents, meventsTable[curHour])
c.SetTime(int64(targetHour) * HOUR)
// save the state when the time is curhour+1
snapshot := c.SnapShot()
stateTable.Store(fmt.Sprintf("%s%d", _StatePrefix, curHour+1), snapshot)
}
}
// provider the events to initialize the cluster instances
func prefileEvent(dataDir string) []InstanceEvent {
prefile := path.Join(dataDir, _Prefile)
table, tabletop := CsvToList(prefile)
log.Println("=========================================")
log.Println(tabletop)
events := string2InstanceEventList(table)
// deal the the raw data ,because of to avoid scdule with out queue event before
var res []InstanceEvent
// remove the dumplicate
record := make(map[InstanceKey]InstanceEvent)
for _, e := range events {
record[e.InstanceId] = e
}
//
for _, e := range record {
e.Time = 1 * SECOND
e.Type = INSTANCEEnable
res = append(res, e)
}
temp := res
for _, e := range temp {
e.Time = 2 * SECOND
e.Type = INSTANCESchdule
res = append(res, e)
}
return res
}
func getMachineEventMap(cacheDb *db.DB) (res map[int][]MachineEvent) {
mtable, err := cacheDb.OpenDBTable(_MTable)
if err != nil {
panic(err)
}
var mevents []MachineEvent
err = mtable.Load(_MKey, &mevents)
return divideMachineEventsByHour(mevents)
}
func divideMachineEventsByHour(mevents []MachineEvent) (res map[int][]MachineEvent) {
res = make(map[int][]MachineEvent)
var divs []int // the divide point which mevents[i] and mevents[i-1] belong to different hour
divs = append(divs, 0)
for i := 1; i < len(mevents); i++ {
if mevents[i].HappenTime()/HOUR != mevents[i-1].HappenTime()/HOUR {
divs = append(divs, i)
}
}
divs = append(divs, len(mevents))
//log.Println("divs:", divs)
for i := 0; i < len(divs)-1; i++ {
start := divs[i]
end := divs[i+1]
h := mevents[start].HappenTime() / HOUR
res[int(h)] = mevents[start:end]
}
return res
}
func handleOneBatchEvents(c ClusterState, ievents []InstanceEvent, mevents []MachineEvent) {
var allevents []IEvent
for i := range mevents {
allevents = append(allevents, &mevents[i])
}
for i := range ievents {
allevents = append(allevents, &ievents[i])
}
tl := InitTimeline(allevents)
i := 0
for !tl.IsEmpty() {
log.Println("count", i)
i++
e, err := tl.OutFirst()
if err != nil {
panic(err)
}
EventHandle(c, e)
}
}
//=============== # some utils =======================================================
//instance_events :
//0 time
//1 missing_type
//2 collection_id
//3 instance_index
//4 machine_id
//5 event_type
//6 username
//7 scheduling_class
//8 priority
//9 request_cpu
//10 request_memory
//11 alloc_collection_id
//12 alloc_instance_index
func FileToInstanceEventList(fileName string) []InstanceEvent {
datas, _ := CsvToList(fileName)
events := string2InstanceEventList(datas)
return events
}
func string2InstanceEventList(datas [][]string) []InstanceEvent {
e := make([]InstanceEvent, len(datas), len(datas))
for i, s := range datas {
e[i] = string2InstanceEvent(s)
}
return e
}
func string2InstanceEvent(s []string) (e InstanceEvent) {
return InstanceEvent{
Event: Event{Time: common.Str_to_int64(s[0]), Object: E_INSTANCE},
Type: common.Str_to_int64(s[5]),
InstanceId: InstanceKey{common.Str_to_int64(s[2]), common.Str_to_int64(s[3])},
//由于google trace数据中missType不为0时,有可能该trace数据,有缺损导致cpu或者ramRequest的数据丢失,这种情况下没办法
//只能填0,这种情况较少,希望影响不大
CpuRequest: strToFloat64WithZeroDefault(s[9]),
RamRequest: strToFloat64WithZeroDefault(s[10]),
//MachineID在该事件类型为提交,进队列等未确定机器的情况下为空值,若为空值则填0
MachineID: MachineID(strToInt64WithZeroDefault(s[4])),
AllocID: common.Str_to_int64(s[11]),
ScheduleClass: common.Str_to_int64(s[7]),
Priority: common.Str_to_int64(s[8]),
}
}
// 0 timestamp
// 1 machine_id
// 2 event_type
// 3 platform_id
// 4 cpu_capacity
// 5 memory_capacity
func string2MachineEventList(datas [][]string) []MachineEvent {
e := make([]MachineEvent, len(datas), len(datas))
for i, s := range datas {
e[i] = string2MachineEvent(s)
}
return e
}
func string2MachineEvent(s []string) (e MachineEvent) {
return MachineEvent{
Event: Event{Time: common.Str_to_int64(s[0]), Object: E_MACHINE},
MachineId: MachineID(common.Str_to_int64(s[1])),
Type: common.Str_to_int64(s[2]),
CpuCapacity: strToFloat64WithZeroDefault(s[4]), // 有时候机器增加事件没有容量信息。没关系,因为后续的update事件会补充这个信息。
RamCapacity: strToFloat64WithZeroDefault(s[5]),
}
}
func strToFloat64WithZeroDefault(s string) float64 {
if common.IsLegalFloat64(s) {
return common.Str_to_float64(s)
} else {
return 0
}
}
func strToInt64WithZeroDefault(s string) int64 {
if common.IsLegalInt64(s) {
return common.Str_to_int64(s)
} else {
return 0
}
}
1
https://gitee.com/lin2631/sc-go.git
git@gitee.com:lin2631/sc-go.git
lin2631
sc-go
sc-go
6252159ce97a

搜索帮助