说明:基于FTP的程序自动更新,配合bat脚本执行

import os,os.path,time,zipfile,socket
from ftplib import FTP  # 引入ftp模块

#FTP操作
class MyFtp:
    ftp = FTP()
    def __init__(self,host,port=21):
        try:
            self.ftp.connect(host,port)
        except:
           print("FTP服务器连接失败!")
           
        

    def login(self,username,pwd):
        self.ftp.set_debuglevel(0)  # 打开调试级别2,显示详细信息
        try:
            self.ftp.login(username,pwd)
        except:
            print("FTP服务器登录失败!")
            return False
        
        print(self.ftp.welcome)
        data = []
        self.ftp.dir(data.append)
        for line in data:
            print("-", line)

    def downloadFile(self,localpath,remotepath,filename):
        os.chdir(localpath)   # 切换工作路径到下载目录
        try:
            self.ftp.cwd(remotepath)   # 要登录的ftp目录
        except:
            print("无法打开FTP目录!")
            return False
        
        self.ftp.nlst()  # 获取目录下的文件
        file_handle = open(filename,"wb").write   # 以写模式在本地打开文件
        try:
            self.ftp.retrbinary('RETR %s' % os.path.basename(filename),file_handle,blocksize=1024)  # 下载ftp文件
        except:
            print("从FTP下载【"+str(filename)+"】文件失败!")
            return False
        
        # ftp.delete(filename)  # 删除ftp服务器上的文件

    def close(self):
        self.ftp.set_debuglevel(0)  # 关闭调试
        self.ftp.quit()

#解压缩操作
class MyZip:
    def __init__(self):
        pass

    # 压缩
    def zip_dir(self,dirname,zipfilename):
        filelist = []
        if os.path.isfile(dirname):
            filelist.append(dirname)
        else :
            for root, dirs, files in os.walk(dirname):
                for name in files:
                    filelist.append(os.path.join(root, name))
        zf = zipfile.ZipFile(zipfilename, "w", zipfile.zlib.DEFLATED)
        for tar in filelist:
            arcname = tar[len(dirname):]
            #print arcname
            zf.write(tar,arcname)
        zf.close()

    # 解压
    def unzip_file(self,zipfilename, unziptodir):
        if not os.path.exists(unziptodir): os.mkdir(unziptodir, '0777')
        zfobj = zipfile.ZipFile(zipfilename)
        for name in zfobj.namelist():
            name = name.replace('\\','/')
            if name.endswith('/'):
                try:
                    os.mkdir(os.path.join(unziptodir, name))
                except:
                    print("【"+name+"】文件夹已存在,不再创建~")
                continue
            
            # if 'Public.py' in name:
            #     print("【"+name+"】文件夹不允许被替换,跳过~")
            #     continue
                

            ext_filename = os.path.join(unziptodir, name)
            ext_dir= os.path.dirname(ext_filename)
            if not os.path.exists(ext_dir) : os.mkdir(ext_dir,'0777')
            print("ext_filename:"+ext_filename)

            try:
                outfile = open(ext_filename, 'w+b')
            except:
                print("【"+ext_filename+"】文件创建失败~")
                continue
           
            outfile.write(zfobj.read(name))
            outfile.close()

    def unzip2_file(self,zipfilename, unziptodir):
        if not os.path.exists(unziptodir): os.mkdir(unziptodir, '0777')
        zfobj = zipfile.ZipFile(zipfilename)
        for name in zfobj.namelist():
            name = name.replace('\\','/')
            if name.endswith('/'):
                try:
                    os.mkdir(os.path.join(unziptodir, name))
                except:
                    print("文件夹已存在,不再创建~")
                
            ext_filename = os.path.join(unziptodir, name)
            ext_dir= os.path.dirname(ext_filename)
            if not os.path.exists(ext_dir) : os.mkdir(ext_dir,'0777')
            print("ext_filename:"+ext_filename)
            outfile = open(ext_filename, 'w+b')
            outfile.write(zfobj.read(name))
            outfile.close()


if __name__ == '__main__':
    socket.setdefaulttimeout(120) 
    try:
        #当前文件位置
        dPath = os.getcwd()
        zipObj = MyZip()

        ftp = MyFtp('127.0.0.1')
        ftp.login('root','root')
        while True:
            version = ftp.downloadFile(dPath+'/update','/','version.txt')
           
            if version == False : 
                print("出现异常,1分钟后重试~")
                time.sleep(60)
                continue
            newV = ""
            oldV = ""
            with open(dPath+"/update/version.txt", "r") as f:  # 打开文件
                newV = f.read()  # 读取文件
                print("最新版本:"+str(newV))
            
            with open(dPath+"/version.txt", "r") as fS:  # 打开文件
                oldV = fS.read()  # 读取文件
                print("当前版本:"+str(oldV))

            if newV == "" or oldV == "":
                print("版本对比失败,1分钟后重试~")
                ftp.close()
                time.sleep(60)
                continue
            
            if float(newV) > float(oldV):
                print("需要更新~")

                cmdObj = os.popen('tasklist /nh|find /i "chrome.exe"')
                tasklist = cmdObj.read()
                cmdObj.close()
                if len(tasklist) > 0:
                    print("有任务正在执行,30秒后重试~")
                    time.sleep(30)
                    continue
                
                os.system('taskkill /f /im cmd.exe')
                os.system('taskkill /f /im python.exe')

                zipFile = ftp.downloadFile(dPath+'/update','/','Hello.zip')
                if zipFile == False : 
                    print("出现异常,30秒后重试~")
                    time.sleep(30)
                    continue

                zipObj.unzip_file(dPath+'/update/Hello.zip',dPath+'\\')
                os.system('start Main.bat')
                print("更新完成~")
            else:
                print("不用更新~")

            print("操作完成,60秒后再次执行~")
            # ftp.quit()
            time.sleep(60)
    except Exception as e:
        print("发生不可预料的错误,日志如下:")
        print(e)
        time.sleep(9999)
最后修改:2023 年 12 月 05 日
如果觉得我的文章对你有用,请随意赞赏