前言 随着要下载的视频越来越多,突然发现我之前写的那个m3u8下载速度太慢了…
刚好在公众号简说python 上看到一个多线程教程,就来试试用多线程下载一下吧!
思路 多线程是用threading库来实现,基础的比较简单,主要是要注意限制线程数量
群里开车现在都是发一堆二维码,所以还得用个二维码识别zxing库
把二维码都保存在一个文件夹,然后启动py,选择这个文件夹就可以自动下载啦!
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 import requestsimport osimport timeimport threadingfrom faker import Fakerfrom tkinter import filedialogfrom tkinter import messageboximport osimport zxing fake = Faker()def get_ts_urls (m3u8_index ): m3u8_url_list = [] try : headers = {'User-Agent' : fake.user_agent()} m3u8s = requests.get(m3u8_index, headers).text m3u8s = m3u8s.strip() m3u8s = m3u8s.split('\n' ) for m3u8 in m3u8s: if m3u8.find('#' ) == -1 : m3u8_server = url_m3u8_index.replace(url_m3u8_index.split('/' )[-1 ], '' ) m3u8_url = m3u8_server + m3u8 m3u8_url_list.append(m3u8_url) print (m3u8_url_list) return m3u8_url_list except : print ('url访问出错' ) return False def down_m3u8 (url ): try : headers = {'User-Agent' : fake.user_agent()} res = requests.get(url, headers).content try : os.mkdir('临时下载文件夹' ) except : pass ts_name = '临时下载文件夹/' + url.split('/' )[-1 ] ts_names.append(ts_name) with open (ts_name, 'wb' ) as f: f.write(res) print ('开始下载{}' .format (ts_name)) except : pass pool_sema.release()def mg_vd (): print ('合并视频中...' ) t = time.strftime("%Y.%m.%d.%H.%M.%S" , time.localtime()) file_path = str (t) + '.ts' sd_ts_names = sorted (ts_names) print (sd_ts_names) with open (file_path, 'wb' ) as fl: for i in sd_ts_names: fl.write(open (i, 'rb' ).read()) os.remove(i) try : os.rmdir('临时下载文件夹' ) except : pass def dg_code (): urls = [] thread1 = threading.Thread(target=read_dir) thread1.start() thread1.join() code_path = code_paths[0 ] imgs = os.listdir(code_path) for img in imgs: img_path = os.path.join(code_path, img) reader = zxing.BarCodeReader() barcode = reader.decode(img_path) url = barcode.parsed urls.append(url) print (urls) return urlsdef read_dir (): print ('请先选择二维码所在的文件夹' ) code_path = filedialog.askdirectory() if not code_path: messagebox.showerror("提示" , "请先选择二维码所在的文件夹" ) read_dir() else : print ('选择文件夹:{}' .format (code_path)) code_paths.append(code_path)if __name__ == '__main__' : code_paths = [] urls_m3u8_index = dg_code() for url_m3u8_index in urls_m3u8_index: m3u8_urls = get_ts_urls(url_m3u8_index) ts_names = [] if m3u8_urls: thread_list = [] max_connections = 5 pool_sema = threading.BoundedSemaphore(max_connections) for i in m3u8_urls: pool_sema.acquire() thread = threading.Thread(target=down_m3u8, args=[i]) thread.start() thread_list.append(thread) for t in thread_list: t.join() mg_vd() print ('下载完成' )
后记 仅供学习交流,咳咳咳