1 Star 1 Fork 0

laogg/csv2mysql

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
csv2mysql.go 19.37 KB
一键复制 编辑 原始数据 按行查看 历史
/* this is jordan gg ,程序员黑洞同学
*/
package main
import (
"archive/zip"
"bufio"
"log"
"math"
"regexp"
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"golang.org/x/text/encoding/simplifiedchinese"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
)
//
//1、查找F:/download2/下的messageRecord_20200713*.zip receptionRecord20200713*.zip serviceWorkload_20200713*.zip
//2、解压出csv file
//3、csv file serviceWorkload_20200713*.csv 需要删除最后一行和首行
//4、导入myslq
//5、执行sql 导入数据库
//6、导入成功后把对应zip移动到子目录下
// srcFile could be a single file or a directory
var zhichifileDir="F:/download2"
var db *sql.DB
var strdatetime string
var fileslist =make(map[string] string)
var serviceWork_csv =[]string{}
var receptionrecord_csv =[]string{}
var messagerecord_csv =[]string{}
var gf os.File
var gloger log.Logger
func main() {
if 2 == len(os.Args) {
zhichifileDir = os.Args[1]
} else if 2 != len(os.Args){
dir, _ := filepath.Abs(filepath.Dir(os.Args[0]));log.Println(dir)
log.Print("\nUsage: zhichi <zip 文件所在 dir> \nex: zhichi f:/download ")
zhichifileDir = dir
}
os.Chdir(zhichifileDir);log.Println(zhichifileDir)
db = openmysql()
defer db.Close()
//
gf, err := os.OpenFile("logzhichi.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {log.Fatal(err)}
defer gf.Close()
log.SetOutput(gf) //gloger= log.New(gf,time.Now().String(),log.Ldate | log.Ltime)
// 检查数据库和本地时间
mid_db_weekname := checkexists_weektable() //最大中间表时间
maxweekstring := checkexists_filecsv(zhichifileDir)//最大zip文件时间
strdatetime = maxweekstring //20200720-20200726
if mid_db_weekname >= maxweekstring {
log.Println(maxweekstring, "数据库中 已经有最新周报了")
}else {
log.Println("\n",maxweekstring, "需要导入:")
delete_csv(zhichifileDir, ".csv")
GetAllFile(zhichifileDir, strdatetime)
//校准列与数据
init_slice_csv("serviceworkload", &serviceWork_csv)
init_slice_csv("receptionrecord", &receptionrecord_csv)
init_slice_csv("messagerecord", &messagerecord_csv)
//开始导入事务
tx, err := db.Begin()
if err != nil {
log.Println("mysql 启动事务失败",err)
return
}else{
rows:=insertintomysql(tx, "serviceworkload", &serviceWork_csv)
if 0==rows{
log.Println("serviceworkload:0 !!!!\n\n\n")
}else{
log.Printf("serviceworkload:%v",rows)
}
rows=insertintomysql(tx, "receptionrecord", &receptionrecord_csv)
if 0==rows{
log.Println("receptionrecord:0 !!!!\n\n\n")
}else{
log.Printf("receptionrecord:%v",rows)
}
rows=insertintomysql(tx, "messagerecord", &messagerecord_csv)
if 0==rows{
log.Println("messagerecord:0 !!!!\n\n\n")
}else{
log.Printf("messagerecord:%v",rows)
}
}
if tx.Commit() !=nil{
tx.Rollback() // 如果事务失败,则导入后的中间表也不存在
log.Println("中间表导入事务失败!")
}else{
log.Println("中间表导入完成。")
}
}
// 数据库中是否有最新中间表数据了?比较中间表和终极表
log.Println("比较最大中间表和终极表:")
mid_db_weekname = checkexists_weektable()
check_db1_db2(db,mid_db_weekname)
log.Println("完成")
log.SetOutput(os.Stdout)
}
// 检查终极表中是否有这个周的数据
// 遍历所有中间表 。3个一组
func check_db1_db2(db *sql.DB, strdatetime string) {
mid_db_date := strdatetime[0:4]+"-"+strdatetime[4:6]+"-"+strdatetime[6:8]
sqlstr:="SELECT max(`周起始时间`) as maxweek FROM service_workload;" //2020-07-20
prow,err :=db.Query(sqlstr)
defer prow.Close()
lastdb_week:=""
if err != nil{
log.Printf ("select error: %v\n", err)
}else{
for prow.Next(){
prow.Scan(&lastdb_week)
err = prow.Err()
if err != nil {log.Printf ("scan row error: %v\n", err)}
}
}
if lastdb_week<mid_db_date{
// 事务
tx, err := db.Begin()
if err != nil {
log.Println("mysql 启动事务失败",err)
return
}else{
if nil != serviceworkload(tx, fmt.Sprintf("serviceworkload%s", strdatetime)) ||
nil != receptionrecord(tx, fmt.Sprintf("receptionrecord%s", strdatetime)) ||
nil != messagerecord(tx, fmt.Sprintf("messagerecord%s", strdatetime)){
tx.Rollback()
return
}
}
if tx.Commit() !=nil{
tx.Rollback()
}
}
}
//列数不定问题。检查每行数据列数,必须和title个数相等,否则补足
func init_slice_csv(k string,a *[]string) {
file, err := os.Open(fileslist[k])
if err != nil { fmt.Println(err); return }
defer file.Close()
line := bufio.NewReader(file)
top_n :=1
r :=0
for {
content, _, err := line.ReadLine()
if err == io.EOF { break }
content, err = simplifiedchinese.GBK.NewDecoder().Bytes([]byte(content))
ss :=string(content)
//fmt.Println(string(content)) //content, _ = simplifiedchinese.GBK.NewEncoder().String(content)
if "serviceworkload"==k{
if 1==r { top_n = strings.Count(ss,",")
}else{
if gap := top_n - strings.Count(ss,",");gap>0{
ss += strings.Repeat(",", gap)
}
}
}else if "receptionrecord"==k {
if 0==r { top_n = strings.Count(ss,",")
}else{
if gap := top_n - strings.Count(ss,",");gap>0{
ss += strings.Repeat(",", gap)
}
}
}else if "messagerecord"==k{
if 0==r { top_n = strings.Count(ss,",")
}else{
if gap := top_n - strings.Count(ss,",");gap>0{
ss += strings.Repeat(",", gap)
}
}
}
*a=append(*a, ss)
r++
}
if "serviceworkload"==k{
*a =(*a)[1:len(*a)-1]
}
}
func checkexists_weektable() string {
sqlstr:="SELECT max(substr(table_name,14,17)) as maxweek FROM information_schema.tables WHERE table_schema='tbcenter' AND table_name regexp '^messagerecord[2-9]';" //20200720-20200726
prow,err :=db.Query(sqlstr)
defer prow.Close()
t_n :=""
if err != nil{
log.Printf ("select error: %v\n", err)
}else{
for prow.Next(){
prow.Scan(&t_n)
err = prow.Err()
if err != nil {log.Printf ("scan row error: %v\n", err)}
}
}
return t_n
}
func checkexists_filecsv(pathname string) string {
rd, err := ioutil.ReadDir(pathname)
if err !=nil{log.Println(err)}
var maxweekstring ,s string
var validID = regexp.MustCompile(`^(serviceWorkload_).+(\.csv)$`) //serviceWorkload_20200720-20200726.csv
var sw_reg = regexp.MustCompile(`^(serviceWorkload).+(\.zip)$`)
for _, fi := range rd {
if fi.IsDir() {
} else if validID.MatchString (fi.Name()) || sw_reg.MatchString(fi.Name()){
s=fi.Name()[16:33]
if maxweekstring<s{
maxweekstring = s
fmt.Println(maxweekstring)
}
}
}
return maxweekstring
}
//替换成自己的参数
func openmysql () *sql.DB {
db, err := sql.Open("mysql", "name:pwd@tcp(MYSQLIP:PORT)/defaultdatabase?charset=utf8")
if err != nil {
log.Panic("failed to open mysql ", err)
}
if err = db.Ping(); err != nil {
log.Panic("failed to ping mysql ", err)
}
return db
}
func insertintomysql(tx *sql.Tx,sw string ,sw_csv *[]string ) int {
title :=""
cols :=[]string{}
t_name := fmt.Sprintf("%s%s", sw, strdatetime)
log.Println("\ninsert into mysql: ",t_name)
ssql := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", t_name)
re, err := tx.Exec(ssql)
if err!=nil{log.Println(re, err);tx.Rollback()}
ssql = fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` (LIKE `%s_2020`);", t_name, sw)
re, err = tx.Exec(ssql)
if err!=nil{log.Println(re, err);tx.Rollback()}
if sw == "serviceworkload" {
cols = strings.Split((*sw_csv)[0], `,`)
title = fmt.Sprintf("`周起始时间`,`%s`", strings.Join(cols, "`,`")) //strings.ReplaceAll(sw_csv[0], ",", "','")
weekstartdate := strdatetime[0:4] + "-" + strdatetime[4:6] + "-" + strdatetime[6:8] //20200720-20200726
ssql = fmt.Sprintf("insert into `%s`(%s)values('%s',", t_name, title, weekstartdate)
}else if sw=="receptionrecord"{
cols=strings.Split((*sw_csv)[0],`,`)
title = fmt.Sprintf("`%s`" , strings.Join(cols,"`,`"))
ssql=fmt.Sprintf("insert into `%s`(%s)values(",t_name,title)
}else if sw=="messagerecord"{
cols=strings.Split((*sw_csv)[0],`,`)
title = fmt.Sprintf("`%s`" , strings.Join(cols,"`,`"))
ssql=fmt.Sprintf("insert into `%s`(%s)values(",t_name,title)
}
*sw_csv = (*sw_csv)[1:] //去掉title行
return intomysql(tx,ssql,sw_csv)
}
func intomysql(tx *sql.Tx,ssql string,a *[]string ) int{
var i =0
var row ,sqlstr string
tol :=len(*a)
for i,row = range (*a) {
row = strings.ReplaceAll(row, "\\", " ")
row = strings.ReplaceAll(row, "'", " ")
row = strings.ReplaceAll(row, ",", "','")
sqlstr = ssql + "'" + row + "')"
//log.Println("总行数:",tol,"当前行:",i+1)
if 0 == math.Round(math.Mod((float64(i+1)) , 100)) {
fmt.Printf("\r %d,%d, doing: %.2f%%",i+1,tol,float32(100.00*(i+1)/tol))
}
re, err := tx.Exec(sqlstr)
if err != nil { log.Println(re,err);tx.Rollback()}
}
fmt.Printf("\r %d,%d, done: %.2f%%\n",i+1,tol,float32(100.00*(i+1)/tol))
return i+1
}
func serviceworkload(tx *sql.Tx,t_name string) error {
sql:="insert into service_workload( `周起始时间`,`客服名/技能组`,`客服邮箱`,`登录时长`,`PC端登录时长`,`移动端登录时长`,`在线时长`,`PC端在线时长`,`移动端在线时长` ,`在线时长占比`,`忙碌时长`,`PC端忙碌时长`,`移动端忙碌时长`,`小休时长`,`培训时长`,`会议时长`,`用餐时长`,`活动时长`,`置忙总时长`,`置忙次数`,`置忙率`,`客服首次登录时间`,`客服最后离线时间`,`人工接待客户`,`人工有效接待客户`,`人工独立接待客户`,`新访客`,`老访客`,`老访客占比`,`人工转入客户`,`人工转出客户`,`离线接待客户数`,`咨询会话数`,`独立接待会话`,`有效会话`,`无效会话`,`有效接待会话`,`无效接待会话`,`有效接待率`,`转入会话`,`转出会话`,`未接待会话`,`未接待会话率`,`一次性解决率`,`最大同时接待会话`,`主动发起会话量`,`平均人工接待时长`,`平均总会话时长`,`已解决会话`,`未解决会话`,`未标注是否解决会话`,`回复率`,`抢接会话`,`被抢接会话`,`总人工会话消息数`,`客服消息数`,`客户消息数`,`答问比`,`客服字数`,`客服离线消息数`,`平均首次响应时长`,`首次响应率`,`平均响应时长`,`最大响应时长`,`30s应答率`,`已评价总数`,`参评率`,`客户主动参评数`,`客户主动参评占比`,`客服邀请数`,`客服邀请率`,`客服邀请参评数`,`客服邀请参评占比`,`平均分`,`好评率(5分占比)`,`4分占比`,`3分占比`,`2分占比`,`1分占比`,`线索数量`,`备注数量`)"
sql += " select `周起始时间` ,`客服名/技能组`,`客服邮箱`,`登录时长`,`PC端登录时长`,`移动端登录时长`,`在线时长`,`PC端在线时长`,`移动端在线时长`,`在线时长占比`,`忙碌时长`,`PC端忙碌时长`,`移动端忙碌时长`,`小休时长`,`培训时长`,`会议时长`,`用餐时长`,`活动时长`,`置忙总时长`,`置忙次数`,`置忙率`,`客服首次登录时间`,`客服最后离线时间`,`人工接待客户`,`人工有效接待客户`,`人工独立接待客户`,`新访客`,`老访客`,`老访客占比`,`人工转入客户`,`人工转出客户`,`离线接待客户数`,`咨询会话数`,`独立接待会话`,`有效会话`,`无效会话`,`有效接待会话`,`无效接待会话`,`有效接待率`,`转入会话`,`转出会话`,`未接待会话`,`未接待会话率`,`一次性解决率`,`最大同时接待会话`,`主动发起会话量`,`平均人工接待时长`,`平均总会话时长`,`已解决会话`,`未解决会话`,`未标注是否解决会话`,`回复率`,`抢接会话`,`被抢接会话`,`总人工会话消息数`,`客服消息数`,`客户消息数`,`答问比`,`客服字数`,`客服离线消息数`,`平均首次响应时长`,`首次响应率`,`平均响应时长`,`最大响应时长`,`30s应答率`,`已评价总数`,`参评率`,`客户主动参评数`,`客户主动参评占比`,`客服邀请数`,`客服邀请率`,`客服邀请参评数`,`客服邀请参评占比`,`平均分`,`好评率(5分占比)`,`4分占比`,`3分占比`,`2分占比`,`1分占比`,`线索数量`,`备注数量`"
sql += fmt.Sprintf(" from `%s`",t_name)
re, err := tx.Exec(sql)
if err != nil { log.Println(re,err)}
log.Println("insert into service_workload ok!")
return err
}
func receptionrecord(tx *sql.Tx,t_name string) error {
sql:="INSERT into `bdp_receptionrecord` (`会话ID`,`警报状态`,`用户昵称` ,`新老访客`,`咨询渠道`,`子渠道`,`页面访问时间`,`最后接待技能组`,`最后接待客服`,`人工接通方式`,`会话建立时间`,`会话结束时间`,`会话接待时长`,`机器人接待时长`,`人工接待时长`,`接入人工客服时间`,`人工咨询时长`,`排队时长`,`人工客服邮箱`,`咨询机器人消息数`,`机器人回复数`,`咨询人工消息数`,`人工回复数`,`撤回消息数`,`机器人有效接待是否转人工`,`转接次数`,`结束方式`,`总结客服`,`所属业务单元`,`问题类型`,`解决状态`,`总结备注`,`服务总结自定义字段名称`,`服务总结自定义字段值`,`人工是否标记无效`,`访问IP`,`位置`,`系统`,`用户ID`,`用户对接ID`,`姓名`,`备注`,`电话`,`邮箱`,`QQ`,`终端`,`params`,`来源页url`,`关键词`,`着陆页url`,`发起页url`,`本次访问页数`,`评价类型`,`问题是否解决`,`满意度评分`,`人工会话交互数`,`评价标签`,`评价内容`)"
sql+="SELECT `会话ID`,`警报状态`,`用户昵称`,`新老访客`,`咨询渠道`,`子渠道`,`页面访问时间`,`最后接待技能组`,`最后接待客服`,`人工接通方式`,`会话建立时间`,`会话结束时间`,`会话接待时长`,`机器人接待时长`,`人工接待时长`,`接入人工客服时间`,`人工咨询时长`,`排队时长`,`人工客服邮箱`,`咨询机器人消息数`,`机器人回复数`,`咨询人工消息数`,`人工回复数`,`撤回消息数`,`机器人有效接待是否转人工`,`转接次数`,`结束方式`,`总结客服`,`所属业务单元`,`问题类型`,`解决状态`,`总结备注`,`服务总结自定义字段名称`,`服务总结自定义字段值`,`人工是否标记无效`,`访问IP`,`位置`,`系统`,`用户ID`,`用户对接ID`,`姓名`,`备注`,`电话`,`邮箱`,`QQ`,`终端`,`params`,`来源页url`,`关键词`,`着陆页url`,`发起页url`,`本次访问页数`,`评价类型`,`问题是否解决`,`满意度评分`,`人工会话交互数`,`评价标签`,`评价内容` "
sql+=fmt.Sprintf(" from `%s`",t_name)
re, err := tx.Exec(sql)
if err != nil { log.Println(re,err)}
log.Println("insert into bdp_receptionrecord ok!")
return err
}
func messagerecord(tx *sql.Tx,t_name string) error {
sql:=fmt.Sprintf("DELETE from `%s` where `会话ID` is null;",t_name)
re, err := tx.Exec(sql)
sql="insert into bdp_messagerecord(`会话ID` ,`消息时间`,`消息来源`,`新老访客`,`来源类型`,`消息目标`,`目标类型`,`消息内容`)"
sql+="SELECT `会话ID` ,`消息时间`,`消息来源`,`新老访客`,`来源类型`,`消息目标`,`目标类型`,`消息内容`"
sql+=fmt.Sprintf(" from `%s`",t_name)
re, err = tx.Exec(sql)
if err != nil { log.Println(re,err)}
log.Println("insert into bdp_messagerecord ok!")
return err
}
func Zip(srcFile string, destZip string) error {
zipfile, err := os.Create(destZip)
if err != nil {
return err
}
defer zipfile.Close()
archive := zip.NewWriter(zipfile)
defer archive.Close()
filepath.Walk(srcFile, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
header.Name = strings.TrimPrefix(path, filepath.Dir(srcFile) + "/")
// header.Name = path
if info.IsDir() {
header.Name += "/"
} else {
header.Method = zip.Deflate
}
writer, err := archive.CreateHeader(header)
if err != nil {
return err
}
if ! info.IsDir() {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(writer, file)
}
return err
})
return err
}
func Unzip(zipFile string, destDir string) error {
zipReader, err := zip.OpenReader(zipFile)
if err != nil {
return err
}
defer zipReader.Close()
for _, f := range zipReader.File {
fpath := filepath.Join(destDir, f.Name)
if f.FileInfo().IsDir() {
os.MkdirAll(fpath, os.ModePerm)
} else {
if err = os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
return err
}
inFile, err := f.Open()
if err != nil {
return err
}
defer inFile.Close()
outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
}
defer outFile.Close()
_, err = io.Copy(outFile, inFile)
if err != nil {
return err
}
}
}
return nil
}
func walkFunc(pathstr string, info os.FileInfo, err error) error {
if info == nil {
// 文件名称超过限定长度等其他问题也会导致info == nil
// 如果此时return err 就会显示找不到路径,并停止查找。
println("can't find:(" + pathstr + ")")
return nil
}
if info.IsDir() {
println("This is folder:(" + pathstr + ")")
return nil
} else {
println("This is file:(" + pathstr + ")")
if path.Dir(pathstr)==zhichifileDir && strings.HasSuffix(info.Name(),".zip"){
Unzip(pathstr,zhichifileDir)
}
return nil
}
}
func showFileList(root string) {
err := filepath.Walk(root, walkFunc)
if err != nil {
fmt.Printf("filepath.Walk() error: %v\n", err)
}
return
}
func FileExist(path string) bool {
_, err := os.Lstat(path)
return !os.IsNotExist(err)
}
// 找指定日期{maxdate}的zip文件解压
// maxdate 形如:20200720-20200726
func GetAllFile(pathname string,maxdate string) error {
rd, err := ioutil.ReadDir(pathname)
var sw_reg = regexp.MustCompile(`^(serviceWorkload).+(\.zip)$`)
var rr_reg = regexp.MustCompile(`^(receptionRecord).+(\.zip)$`)
var mr_reg = regexp.MustCompile(`^(messageRecord).+(\.zip)$`)
myzip:=func (f_name string ,path_n string,c_dir string ){
fmt.Println(f_name)
if path_n == c_dir {
Unzip(path_n+"/"+f_name, c_dir)
}
}
for _, fi := range rd {
if fi.IsDir() { //fmt.Printf("[%s]\n", pathname+"\\"+fi.Name())
} else if sw_reg.MatchString(fi.Name()) {//serviceWorkload_20200622-20200628.zip
if maxdate==fi.Name()[16:33] {
myzip(fi.Name(),pathname ,zhichifileDir)
}
} else if rr_reg.MatchString(fi.Name()) { //receptionRecord20200704-20200704_44fd17ca-8773-4449-9712-eb7711177f3e.zip
if maxdate==fi.Name()[15:32] {
myzip(fi.Name(),pathname ,zhichifileDir)
}
} else if mr_reg.MatchString(fi.Name()) { //messageRecord_20200622-20200628.zip
if maxdate==fi.Name()[14:31] {
myzip(fi.Name(),pathname ,zhichifileDir)
}
}
}
rd, err = ioutil.ReadDir(pathname)
for _, fi := range rd {
if ! fi.IsDir() && path.Ext(fi.Name())==".csv"{
if fi.Name()[0:15]=="serviceWorkload" {
fileslist["serviceworkload"] = fi.Name()
}else if fi.Name()[0:5]=="recep" {
os.Rename(fi.Name(),"receptionRecord_"+maxdate+".csv")
fileslist["receptionrecord"]= "receptionRecord_"+maxdate+".csv"
} else if fi.Name()[0:5]=="messa" {
os.Rename(fi.Name(),"messageRecord_"+maxdate+".csv")
fileslist["messagerecord"]= "messageRecord_"+maxdate+".csv"
}
}
}
return err
}
func delete_csv(pathname string,suffix string) error {
rd, err := ioutil.ReadDir(pathname)
for _, fi := range rd {
if fi.IsDir() { //fmt.Printf("[%s]\n", pathname+"\\"+fi.Name())
} else if strings.HasSuffix(fi.Name(),suffix){
fmt.Println(fi.Name())
if pathname==zhichifileDir {
os.Remove(pathname+"/"+fi.Name())
}
}
}
log.Println("\ndelete_csv",pathname," -- ",suffix, "\n")
return err
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/laogg/csv2mysql.git
git@gitee.com:laogg/csv2mysql.git
laogg
csv2mysql
csv2mysql
master

搜索帮助