diff --git a/manager/handlers.go b/manager/handlers.go new file mode 100644 index 0000000000000000000000000000000000000000..7c81e95d771c6081f632175967bbcacbe66fe94b --- /dev/null +++ b/manager/handlers.go @@ -0,0 +1,253 @@ +package manager + +import ( + "os" + "io" + "fmt" + // "net/url" + // "syscall" + "time" + // gzip "github.com/klauspost/pgzip" + "math/rand" + "archive/tar" + "io/ioutil" + "path/filepath" + "strconv" + "net/http" + "github.com/labstack/echo" + "github.com/seveirbian/gear/pkg" + "github.com/seveirbian/gear/types" +) + +var ( + +) + +func handleNodes(c echo.Context) error { + var resp string + + mgr.NodesMu.RLock() + + for _, node := range(mgr.Nodes) { + resp = resp + strconv.FormatUint(node.ID, 10) + ":" + node.IP + ":" + node.Port + ";" + } + + defer mgr.NodesMu.RUnlock() + + return c.String(http.StatusOK, resp) +} + +func handleJoin(c echo.Context) error { + mIP := c.Param("IP") + mPort := c.Param("Port") + + id := pkg.CreateIdFromIP(mIP) + + // 对Nodes数据结构加读锁 + mgr.NodesMu.RLock() + + _, ok := mgr.Nodes[id] + + // 解锁 + mgr.NodesMu.RUnlock() + + if !ok { + // 对Nodes数据结构加写锁 + mgr.NodesMu.Lock() + + mgr.Nodes[id] = types.Node{ + ID: id, + IP: mIP, + Port: mPort, + } + + // 解锁 + mgr.NodesMu.Unlock() + } + + return c.NoContent(http.StatusOK) +} + +func handlePull(c echo.Context) error { + cid := c.Param("CID") + + // fmt.Println(filepath.Join(GearStoragePath, cid)) + + _, err := os.Lstat(filepath.Join(GearStoragePath, cid)) + if err != nil { + logger.Warnf("Fail to lstat file for %v", err) + } + + // 返回本地文件 + // f, err := os.Open(filepath.Join(GearStoragePath, cid)) + // if err != nil { + // logger.Fatalf("Fail to open file: %s\n", filepath.Join(GearStoragePath, cid)) + // } + // defer f.Close() + // 上共享文件锁 + // err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH) + // if err != nil { + // logger.Fatal("Fail to lock file in a sharing way...") + // } + err = c.Attachment(filepath.Join(GearStoragePath, cid), cid) + if err != nil { + logger.Fatal("Fail to return file...") + } + // 解锁 + // err = syscall.Flock(int(f.Fd()), syscall.LOCK_UN) + // if err != nil { + // logger.Fatal("Fail to unlock file in a sharing way...") + // } + + return nil +} + +func handleQuery(c echo.Context) error { + cid := c.Param("CID") + + fmt.Printf("Querying %s\n", cid) + + _, err := os.Lstat(filepath.Join(GearStoragePath, cid)) + if err != nil { + return c.NoContent(http.StatusNotFound) + } + + return c.NoContent(http.StatusOK) +} + +func handlePush(c echo.Context) error { + cid := c.Param("CID") + file, err := c.FormFile("file") + if err != nil { + logger.Warnf("Fail to get formfile for %v", err) + } + + // 检测是否已经存在cid文件 + _, err = os.Lstat(filepath.Join(GearStoragePath, cid)) + if err != nil { + // storage端没有cid文件,下载到storage + src, err := file.Open() + if err != nil { + logger.Warnf("Fail to open file for %v", err) + return c.NoContent(http.StatusInternalServerError) + } + defer src.Close() + + dst, err := os.Create(filepath.Join(GearStoragePath, cid)) + if err != nil { + logger.Warnf("Fail to create file for %v", err) + return c.NoContent(http.StatusInternalServerError) + } + defer dst.Close() + + _, err = io.Copy(dst, src) + if err != nil { + logger.Warnf("Fail to copy for %V", err) + return c.NoContent(http.StatusInternalServerError) + } + } + + // storage已经有cid文件,直接返回 + return c.NoContent(http.StatusOK) +} + +func handlePreFetch(c echo.Context) error { + t := time.Now() + + values, err := c.FormParams() + if err != nil { + logger.Warnf("Fail to get form params for %v", err) + } + + files := values["files"] + + rand.Seed(time.Now().Unix()) + tmpFileName := strconv.Itoa(rand.Int()) + + fmt.Println(tmpFileName) + + defer os.Remove(filepath.Join(GearStoragePath, tmpFileName)) + + tmpFile, err := os.Create(filepath.Join(GearStoragePath, tmpFileName)) + if err != nil { + logger.Warnf("Fail to create file for %v", err) + } + + tw := tar.NewWriter(tmpFile) + + for _, file := range files { + f, err := os.Stat(filepath.Join(GearStoragePath, file)) + if err != nil { + logger.Warnf("Fail to stat file for %v", err) + continue + } + + hd, err := tar.FileInfoHeader(f, "") + if err != nil { + logger.Warn("Fail to get file head...") + return err + } + + err = tw.WriteHeader(hd) + if err != nil { + logger.WithField("err", err).Warn("Fail to write header info") + return err + } + + b, err := ioutil.ReadFile(filepath.Join(GearStoragePath, file)) + if err != nil { + logger.Warnf("Fail to read file for %v", err) + } + + _, err = tw.Write(b) + if err != nil { + logger.WithField("err", err).Warn("Fail to write content...") + return err + } + } + + tw.Close() + tmpFile.Close() + + fmt.Println("tar time: ", time.Since(t)) + + // 再压缩,使用gzip + // gzipFile, err := os.Create(filepath.Join(GearStoragePath, tmpFileName+"gzip")) + // if err != nil { + // logger.Warnf("Fail to create gzip file for %v", err) + // } + + // gw := gzip.NewWriter(gzipFile) + + // tarContent, err := ioutil.ReadFile(filepath.Join(GearStoragePath, tmpFileName)) + // if err != nil { + // logger.Warnf("Fail to read tmp file for %v", err) + // } + + // _, err = gw.Write(tarContent) + // if err != nil { + // logger.Warnf("Fail to write gzip file for %v", err) + // } + + // gw.Close() + // gzipFile.Close() + + err = c.Attachment(filepath.Join(GearStoragePath, tmpFileName), tmpFileName) + if err != nil { + logger.Fatal("Fail to return file...") + } + + fmt.Println("Time used: ", time.Since(t)) + + fmt.Println("File number: ", len(files)) + + return nil +} + + + + + + + + diff --git a/manager/manager.go b/manager/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..158a1b365fc2aa354c23ab98e2264bb2a5697624 --- /dev/null +++ b/manager/manager.go @@ -0,0 +1,139 @@ +package manager + +import ( + "fmt" + "sync" + "time" + "net" + "net/http" + "path/filepath" + "github.com/labstack/echo" + "github.com/sirupsen/logrus" + "github.com/seveirbian/gear/pkg" + "github.com/seveirbian/gear/types" +) + +var ( + logger = logrus.WithField("gear", "manager") +) + +var ( + GearPath = "/var/lib/gear/" + GearGzipPath = filepath.Join(GearPath, "gzip") + GearStoragePath = filepath.Join(GearPath, "storage") +) + +var ( + port = "2019" + mgr Manager = Manager{} +) + +type Manager struct { + Self types.Node + + Echo *echo.Echo + + NodesMu sync.RWMutex + Nodes map[uint64]types.Node + + MonitorIp string + MonitorPort string +} + +func Init() (*Manager, error) { + // 1. create echo instance + e := echo.New() + + // 2. add routes + e.GET("/nodes", handleNodes) + e.POST("/join/:IP/:Port", handleJoin) + e.POST("/pull/:CID", handlePull) + e.POST("/query/:CID", handleQuery) + e.POST("/push/:CID", handlePush) + + e.POST("/prefetch", handlePreFetch) + + // e.POST("/report/:IMAGE", handleReport) + + // 3. get self's IP + ip := pkg.GetSelfIp() + + // 4. create self ID + id := pkg.CreateIdFromIP(ip) + + // 5. fill the manager's fileds + mgr.Self = types.Node{ + ID: id, + IP: ip, + Port: port, + } + mgr.Echo = e + mgr.NodesMu = sync.RWMutex{} + mgr.Nodes = map[uint64]types.Node{} + + // 6. Monitor's ip and port + mgr.MonitorIp = pkg.GetSelfIp() + mgr.MonitorPort = "2021" + + return &mgr, nil +} + +func (m *Manager) Start() { + fmt.Printf("Manager IP: %s\n", m.Self.IP) + + // 需要一个协程定期查询所有节点是否在线 + go updateNodes(m) + + m.start() +} + +func (m *Manager) start() { + m.Echo.Logger.Fatal(m.Echo.Start(":" + m.Self.Port)) +} + +func updateNodes(m *Manager) { + for { + nodes, err := m.askNodes() + if err != nil { + logger.Fatal("Fail to ask cluster nodes...") + } + + m.NodesMu.Lock() + for _, node := range(nodes) { + delete(m.Nodes, node.ID) + } + m.NodesMu.Unlock() + + time.Sleep(60*time.Second) + } +} + +func (m *Manager) askNodes() ([]types.Node, error) { + var quitedNodes = []types.Node{} + + for _, node := range(m.Nodes) { + c := http.Client{ + Transport: &http.Transport{ + Dial: func(netw, addr string) (net.Conn, error) { + // deadline := time.Now().Add(25 * time.Second) + c, err := net.DialTimeout(netw, addr, time.Second*5) + if err != nil { + return nil, err + } + // c.SetDeadline(deadline) + return c, nil + }, + }, + } + + _, err := c.Get("http://"+node.IP+":"+node.Port+"/info") + if err != nil { + quitedNodes = append(quitedNodes, node) + } + } + + return quitedNodes, nil +} + + + diff --git a/push/push.go b/push/push.go new file mode 100644 index 0000000000000000000000000000000000000000..f6ea1627d9b39f0d917a52bc9f8959eb8bdfe04e --- /dev/null +++ b/push/push.go @@ -0,0 +1,196 @@ +package push + +import ( + "fmt" + "io" + "os" + "bytes" + "strings" + "net/url" + "net/http" + "mime/multipart" + // "crypto/md5" + // "archive/tar" + // "encoding/json" + "path/filepath" + + // "github.com/docker/docker/api/types" + // "github.com/seveirbian/gear/pkg" + // "github.com/docker/docker/client" + // "github.com/docker/docker/daemon/graphdriver/overlay2" + "github.com/sirupsen/logrus" + // "golang.org/x/net/context" +) + +var ( + logger = logrus.WithField("gear", "push") +) + +type Pusher struct { + StorageIP string + StoragePort string + + GFilesDir string + FilesToSent map[string]string + + DoNotClean bool +} + +func InitPusher(path, ip, port string, doNotClean bool) (*Pusher, error) { + noClean := false + if doNotClean == true { + noClean = true + } + + return &Pusher { + StorageIP: ip, + StoragePort: port, + GFilesDir: path, + FilesToSent: map[string]string{}, + DoNotClean: noClean, + }, nil +} + +func (p *Pusher) Push() { + // 遍历普通文件目录,将所有文件添加到待push的字典中 + // xxxx/files 文件夹 + err := filepath.Walk(p.GFilesDir, func(path string, f os.FileInfo, err error) error { + if f == nil { + return err + } + + // 放置将files文件夹也传过去 + pathSlice := strings.SplitAfter(path, p.GFilesDir) + if pathSlice[1] == "" { + return nil + } + + p.FilesToSent[f.Name()] = path + + return nil + }) + + if err != nil { + logger.Warnf("Fail to walk dir for %v", err) + } + + // 将字典中所有文件都询问manager,如果该文件已经存在storage中,则将其从字典中删除 + fmt.Println("Querying...") + toDelete := map[string]string{} + for cid, path := range p.FilesToSent { + resp, err := http.PostForm("http://"+p.StorageIP+":"+p.StoragePort+"/query/"+cid, url.Values{}) + if err != nil { + logger.Warnf("Fail to query cid for %v", err) + } + + if resp.StatusCode == http.StatusOK { + toDelete[cid] = path + } + } + + for cid, _ := range toDelete { + delete(p.FilesToSent, cid) + } + + fmt.Println("Uploading...") + for cid, path := range p.FilesToSent { + // 创建表单文件 + // CreateFormFile 用来创建表单,第一个参数是字段名,第二个参数是文件名 + buf := new(bytes.Buffer) + writer := multipart.NewWriter(buf) + formFile, err := writer.CreateFormFile("file", cid) + if err != nil { + logger.Warnf("Fail to create form file failed: %v", err) + } + // 从文件读取数据,写入表单 + srcFile, err := os.Open(path) + if err != nil { + logger.Warnf("Fail to open source file for %v", err) + } + _, err = io.Copy(formFile, srcFile) + if err != nil { + logger.Warnf("Fail to write to form file for %v", err) + } + // 发送表单 + contentType := writer.FormDataContentType() + writer.Close() // 发送之前必须调用Close()以写入结尾行 + _, err = http.Post("http://"+p.StorageIP+":"+p.StoragePort+"/push/"+cid, contentType, buf) + if err != nil { + logger.Fatalf("Post failed: %s\n", err) + } + srcFile.Close() + } + + fmt.Println("Push OK!") + + if !p.DoNotClean { + fmt.Println("Cleaning up dir: ", p.GFilesDir) + err = os.RemoveAll(p.GFilesDir) + if err != nil { + logger.Warnf("Fail to remove all files under p.GFilesDir for %v", err) + } + + fmt.Println("Clean up OK!") + } +} + +func ParseImage(image string) (imageName string, imageTag string) { + registryAndImage := strings.Split(image, "/") + + // dockerhub镜像 + if len(registryAndImage) == 1 { + imageAndTag := strings.Split(image, ":") + + switch len(imageAndTag) { + case 1: + logger.Warn("No image tag provided, use \"latest\"\n") + imageName = imageAndTag[0] + imageTag = "latest" + case 2: + imageName = imageAndTag[0] + imageTag = imageAndTag[1] + } + } + + // 私有仓库镜像 + if len(registryAndImage) == 2 { + imageAndTag := strings.Split(registryAndImage[1], ":") + + switch len(imageAndTag) { + case 1: + logger.Warn("No image tag provided, use \"latest\"\n") + imageName = registryAndImage[0] + "/" + imageAndTag[0] + imageTag = "latest" + case 2: + imageName = registryAndImage[0] + "/" + imageAndTag[0] + imageTag = imageAndTag[1] + } + } + + return +} + + + + + + + + + + + + + + + + + + + + + + + + +