python3多线程下载视频

前言

随着要下载的视频越来越多,突然发现我之前写的那个m3u8下载速度太慢了…

刚好在公众号简说python上看到一个多线程教程,就来试试用多线程下载一下吧!

思路

多线程是用threading库来实现,基础的比较简单,主要是要注意限制线程数量

群里开车现在都是发一堆二维码,所以还得用个二维码识别zxing库

把二维码都保存在一个文件夹,然后启动py,选择这个文件夹就可以自动下载啦!

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import requests
import os
import time
import threading
from faker import Faker
from tkinter import filedialog
from tkinter import messagebox
import os
import zxing


# 初始化
fake = Faker()


# 获取m3u8下载链接
def get_ts_urls(m3u8_index):
# 储存m3u8链接
m3u8_url_list = []

try:
# 定义请求头
headers = {'User-Agent': fake.user_agent()}
# get获取
m3u8s = requests.get(m3u8_index, headers).text
# 拆分
m3u8s = m3u8s.strip()
m3u8s = m3u8s.split('\n')
# print(m3u8s)
for m3u8 in m3u8s:
# print(m3u8)
# 判断 #
if m3u8.find('#') == -1:
# print(m3u8)
# 获取前缀
m3u8_server = url_m3u8_index.replace(url_m3u8_index.split('/')[-1], '')
# 合并
m3u8_url = m3u8_server + m3u8
# print(m3u8_url)
# 添加到 list
m3u8_url_list.append(m3u8_url)
print(m3u8_url_list)
return m3u8_url_list
except:
print('url访问出错')
return False


# 下载视频
def down_m3u8(url):
try:
# 定义随机请求头
headers = {'User-Agent': fake.user_agent()}
res = requests.get(url, headers).content
try:
os.mkdir('临时下载文件夹')
except:
pass
ts_name = '临时下载文件夹/' + url.split('/')[-1]
ts_names.append(ts_name)
with open(ts_name, 'wb') as f:
f.write(res)
print('开始下载{}'.format(ts_name))
except:
pass
# 线程结束锁
pool_sema.release()


# 合并视频
def mg_vd():
print('合并视频中...')
t = time.strftime("%Y.%m.%d.%H.%M.%S", time.localtime())
file_path = str(t) + '.ts'
# 先排序
sd_ts_names = sorted(ts_names)
print(sd_ts_names)
with open(file_path, 'wb') as fl:
for i in sd_ts_names:
fl.write(open(i, 'rb').read())
# 删除原视频
os.remove(i)
# 删除文件夹
try:
os.rmdir('临时下载文件夹')
except:
pass


# 识别二维码
def dg_code():
# 创建识别到的链接list
urls = []
# 获取二维码储存文件夹
thread1 = threading.Thread(target=read_dir)
thread1.start()
thread1.join()
# 遍历文件夹
code_path = code_paths[0]
imgs = os.listdir(code_path)
# print(imgs)
for img in imgs:
img_path = os.path.join(code_path, img)
# 开始识别二维码
reader = zxing.BarCodeReader()
barcode = reader.decode(img_path)
url = barcode.parsed
# print(url)
# 加入list
urls.append(url)

print(urls)
return urls


# 读取文件夹
def read_dir():
# 选择二维码所在文件夹
print('请先选择二维码所在的文件夹')
code_path = filedialog.askdirectory() # 选择文件夹,路径保存于foldr_patch中
if not code_path:
messagebox.showerror("提示", "请先选择二维码所在的文件夹")
read_dir()
else:
print('选择文件夹:{}'.format(code_path))
code_paths.append(code_path)


if __name__ == '__main__':
# m3u8url
# 识别二维码
code_paths = []
urls_m3u8_index = dg_code()
for url_m3u8_index in urls_m3u8_index:
# 获取下载url
m3u8_urls = get_ts_urls(url_m3u8_index)
# 储存临时视频路径list
ts_names = []
if m3u8_urls:
# print(m3u8_urls)
# 线程list
thread_list = []
# 管理线程数
max_connections = 5
pool_sema = threading.BoundedSemaphore(max_connections)

for i in m3u8_urls:
# 线程开始锁
pool_sema.acquire()
thread = threading.Thread(target=down_m3u8, args=[i])
thread.start()
thread_list.append(thread)

for t in thread_list:
t.join()

# 合并视频
mg_vd()

print('下载完成')

后记

仅供学习交流,咳咳咳