1 Star 2 Fork 0

禾木 / Python连接mssql进行数据同步

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
datasync.py 8.27 KB
一键复制 编辑 原始数据 按行查看 历史
禾木 提交于 2019-08-30 11:30 . 首次
import pymssql
import configparser
import logging
import sys
import ftplib
from time import sleep
import decimal
def mylogger():
# 加入日志
logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s %(levelname)s :%(message)s')
file_handler = logging.FileHandler("datasync.log")
file_handler.setFormatter(formatter)
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 为logge添加具体的日志处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
return logger
def getconfig():
cf = configparser.ConfigParser()
cf.read("config.ini")
cf.sections()
cf.options("db")
return cf
def readTableList():
tb_file = open("table.txt","r")
lines = tb_file.readlines()
#for line in lines:
# print(line.split(","))
return lines
# 创建数据库连接
def createConnection(db_server,db_database,db_user,db_password):
try:
conn = pymssql.connect(server=db_server,user=db_user,password=db_password,database=db_database,timeout=20)
return conn
except Exception as err:
logger.error(err)
return None
#关闭数据库连接。
def closeConnection(conn):
conn.close()
# 查询数据
def queryRows(conn,sql):
try:
cursor = conn.cursor()
cursor.execute(sql)
fld_str = ""
szfld = ""
for fld in cursor.description:
if fld_str == "":
fld_str = fld[0]
szfld = "%s"
else:
fld_str = fld_str + "," + fld[0]
szfld = szfld + ",%s"
#print(fld_str)
rows = cursor.fetchall()
cursor.close()
return rows,fld_str,szfld
except Exception as err:
logger.error(err)
def ftpInit(server,user,password):
try:
ftp = ftplib.FTP(server) # 实例化FTP对象
ftp.login(user, password) # 登录
return ftp
except Exception as err:
logger.error(err)
return None
def ftpDown(ftp,local_path,file_name):
try:
file_remote = file_name
file_local = local_path + file_name
bufsize = 1024 # 设置缓冲器大小
fp = open(file_local, 'wb')
ftp.retrbinary('RETR %s' % file_remote, fp.write, bufsize)
fp.close()
return True
except Exception as err:
logger.error(err)
return False
def dataCollection():
db_server = cf.get("db", "server")
db_user = cf.get("db", "user")
db_password = cf.get("db","password")
db_database = cf.get("db", "database")
ftp_user = cf.get("ftp","user")
ftp_password = cf.get("ftp","password")
ftp_path = cf.get("ftp", "path")
update_flag = cf.getint('sys', 'update_flag')
conn = createConnection(db_server,db_database,db_user,db_password)
if conn == None:
logger.error("服务器数据库连接失败!!请检查配置文件数据库配置信息是否正确!")
return
stid = cf.getint("sys", "stationid")
if stid == "0":
sql = "select station_id,db_ip,db_name,db_user,db_password from " + cf.get("sys","client_table")
else:
sql = "select lane_id,db_ip,db_name,db_user,db_password from " + cf.get("sys","client_table") + " where STATION_ID=" + str(stid)
clt_list,s,s1 = queryRows(conn,sql)
for clt in clt_list:
#conn1 = createConnection(clt[1],clt[2],clt[3],clt[4])
conn1 = createConnection(clt[1], clt[2], clt[3], db_password)
if conn1 == None:
continue
logger.info("connecting "+str(clt[0])+":IP="+clt[1]+",DB:"+clt[2])
cur1 = conn1.cursor()
if cur1:
logger.info("connected success:" + str(clt[0]) + ":IP=" + clt[1] + ",DB:"+clt[2])
logger.info("collecting " + str(clt[0]) + ":IP=" + clt[1] + ",DB:"+clt[2]+"...")
for tb_cfg in table_config_list:
if tb_cfg[0] == "#":
continue
tbinfo = tb_cfg.split(";")
if len(tbinfo) >= 6:#表名0、主键名称1、主键位置2、传输字段名称3、传输字段bit控制位4、是否更新版本表5,是否ftp传输6,ftp传输文件字段位置7
mark = str(2 ** int(tbinfo[4]))
sql = "select * from " + tbinfo[0] + " where " + tbinfo[3] + " & " + mark +" <> " + mark;
logger.info(sql)
rows,fld_str,szfld = queryRows(conn1,sql)
iNum = 0
if len(rows)>0:
#判断是否需要ftp传输文件,需要的话则初始化ftp连接
if tbinfo[6] == "1":#有需要传输的ftp文件
ftp = ftpInit(clt[1],ftp_user,ftp_password)
if ftp == None:
logger.error("ftpInit Error! continue")
continue
else:
logger.error("ftpInit success!")
logger.info("collecting " + tbinfo[0] + "," + str(len(rows)) + " records waiting...")
for row in rows:
if tbinfo[6] == "1": # 有需要传输的ftp文件
ftp_list = tbinfo[7].split(",")
ftp_err = False
for szftp in ftp_list:
if ftpDown(ftp,ftp_path,row[int(szftp)]) == False:
logger.error("ftpDown file "+row[int(tbinfo[7])]+" Error! break")
ftp_err = True
break
else:
logger.info("ftpDown file " + row[int(tbinfo[7])] + " success!")
if ftp_err:
continue
cur = conn.cursor()
sql = " delete from " + tbinfo[0] + " where " + tbinfo[1] + "='" + row[int(tbinfo[2])] + "'"
cur.execute(sql)
sql = " insert into " + tbinfo[0] + "(" + fld_str + ") values("
sql = sql + szfld + ")"
print(sql,row)
cur.execute(sql,row)
conn.commit()
if update_flag == 1:
sql = "update "+ tbinfo[0] + " set "+ tbinfo[3] +"=" + tbinfo[3] + " | " + str(mark) +" where " + tbinfo[1] + "='" + row[int(tbinfo[2])] + "'"
cur1.execute(sql)
conn1.commit()
iNum += 1
#logger.info("collected " + tbinfo[0]+":"+tbinfo[1] + "='" + row[int(tbinfo[2])] + "' success!")
logger.info("collected:" + tbinfo[0]+","+str(iNum)+" records finished!")
if tbinfo[5] == "1":
sql = "update TD_TABLE_VERSION set full_dt=getdate() where table_name='"+tbinfo[0]+"'"
cur.execute(sql)
conn.commit()
logger.info("update TD_TABLE_VERSION table_name=" + tbinfo[0] + " Record success")
else:
logger.info(tbinfo[0] + " nothing to collect!")
else:
logger.error("Error:" + tb_cfg)
logger.info("collected " + clt[1] + " finished.")
closeConnection(conn1)
if tbinfo[6] == "1": # 有需要传输的ftp文件
ftp.quit()
else:
logger.error("connected "+clt[1]+" failed")
closeConnection(conn)
if __name__ == '__main__':
logger = mylogger()
cf = getconfig()
table_config_list = readTableList()
slptime = cf.getint('sys','slptime')
update_flag = cf.getint('sys','update_flag')
if slptime == 0:
dataCollection()
else:
while True:
dataCollection()
sleep(slptime)
Python
1
https://gitee.com/lxfsky/python_mssql_sync.git
git@gitee.com:lxfsky/python_mssql_sync.git
lxfsky
python_mssql_sync
Python连接mssql进行数据同步
master

搜索帮助