说明:基于FTP的程序自动更新,配合bat脚本执行
import os,os.path,time,zipfile,socket
from ftplib import FTP # 引入ftp模块
#FTP操作
class MyFtp:
ftp = FTP()
def __init__(self,host,port=21):
try:
self.ftp.connect(host,port)
except:
print("FTP服务器连接失败!")
def login(self,username,pwd):
self.ftp.set_debuglevel(0) # 打开调试级别2,显示详细信息
try:
self.ftp.login(username,pwd)
except:
print("FTP服务器登录失败!")
return False
print(self.ftp.welcome)
data = []
self.ftp.dir(data.append)
for line in data:
print("-", line)
def downloadFile(self,localpath,remotepath,filename):
os.chdir(localpath) # 切换工作路径到下载目录
try:
self.ftp.cwd(remotepath) # 要登录的ftp目录
except:
print("无法打开FTP目录!")
return False
self.ftp.nlst() # 获取目录下的文件
file_handle = open(filename,"wb").write # 以写模式在本地打开文件
try:
self.ftp.retrbinary('RETR %s' % os.path.basename(filename),file_handle,blocksize=1024) # 下载ftp文件
except:
print("从FTP下载【"+str(filename)+"】文件失败!")
return False
# ftp.delete(filename) # 删除ftp服务器上的文件
def close(self):
self.ftp.set_debuglevel(0) # 关闭调试
self.ftp.quit()
#解压缩操作
class MyZip:
def __init__(self):
pass
# 压缩
def zip_dir(self,dirname,zipfilename):
filelist = []
if os.path.isfile(dirname):
filelist.append(dirname)
else :
for root, dirs, files in os.walk(dirname):
for name in files:
filelist.append(os.path.join(root, name))
zf = zipfile.ZipFile(zipfilename, "w", zipfile.zlib.DEFLATED)
for tar in filelist:
arcname = tar[len(dirname):]
#print arcname
zf.write(tar,arcname)
zf.close()
# 解压
def unzip_file(self,zipfilename, unziptodir):
if not os.path.exists(unziptodir): os.mkdir(unziptodir, '0777')
zfobj = zipfile.ZipFile(zipfilename)
for name in zfobj.namelist():
name = name.replace('\\','/')
if name.endswith('/'):
try:
os.mkdir(os.path.join(unziptodir, name))
except:
print("【"+name+"】文件夹已存在,不再创建~")
continue
# if 'Public.py' in name:
# print("【"+name+"】文件夹不允许被替换,跳过~")
# continue
ext_filename = os.path.join(unziptodir, name)
ext_dir= os.path.dirname(ext_filename)
if not os.path.exists(ext_dir) : os.mkdir(ext_dir,'0777')
print("ext_filename:"+ext_filename)
try:
outfile = open(ext_filename, 'w+b')
except:
print("【"+ext_filename+"】文件创建失败~")
continue
outfile.write(zfobj.read(name))
outfile.close()
def unzip2_file(self,zipfilename, unziptodir):
if not os.path.exists(unziptodir): os.mkdir(unziptodir, '0777')
zfobj = zipfile.ZipFile(zipfilename)
for name in zfobj.namelist():
name = name.replace('\\','/')
if name.endswith('/'):
try:
os.mkdir(os.path.join(unziptodir, name))
except:
print("文件夹已存在,不再创建~")
ext_filename = os.path.join(unziptodir, name)
ext_dir= os.path.dirname(ext_filename)
if not os.path.exists(ext_dir) : os.mkdir(ext_dir,'0777')
print("ext_filename:"+ext_filename)
outfile = open(ext_filename, 'w+b')
outfile.write(zfobj.read(name))
outfile.close()
if __name__ == '__main__':
socket.setdefaulttimeout(120)
try:
#当前文件位置
dPath = os.getcwd()
zipObj = MyZip()
ftp = MyFtp('127.0.0.1')
ftp.login('root','root')
while True:
version = ftp.downloadFile(dPath+'/update','/','version.txt')
if version == False :
print("出现异常,1分钟后重试~")
time.sleep(60)
continue
newV = ""
oldV = ""
with open(dPath+"/update/version.txt", "r") as f: # 打开文件
newV = f.read() # 读取文件
print("最新版本:"+str(newV))
with open(dPath+"/version.txt", "r") as fS: # 打开文件
oldV = fS.read() # 读取文件
print("当前版本:"+str(oldV))
if newV == "" or oldV == "":
print("版本对比失败,1分钟后重试~")
ftp.close()
time.sleep(60)
continue
if float(newV) > float(oldV):
print("需要更新~")
cmdObj = os.popen('tasklist /nh|find /i "chrome.exe"')
tasklist = cmdObj.read()
cmdObj.close()
if len(tasklist) > 0:
print("有任务正在执行,30秒后重试~")
time.sleep(30)
continue
os.system('taskkill /f /im cmd.exe')
os.system('taskkill /f /im python.exe')
zipFile = ftp.downloadFile(dPath+'/update','/','Hello.zip')
if zipFile == False :
print("出现异常,30秒后重试~")
time.sleep(30)
continue
zipObj.unzip_file(dPath+'/update/Hello.zip',dPath+'\\')
os.system('start Main.bat')
print("更新完成~")
else:
print("不用更新~")
print("操作完成,60秒后再次执行~")
# ftp.quit()
time.sleep(60)
except Exception as e:
print("发生不可预料的错误,日志如下:")
print(e)
time.sleep(9999)
4 条评论
这篇文章提供了宝贵的经验和见解,对读者有很大的启发和帮助。
作者的观点新颖且实用,让人在阅读中获得了新的思考和灵感。
古典诗词化用自然,毫无斧凿痕迹。
感谢分享