fix: replace requests with httpx

This commit is contained in:
abc1763613206
2025-02-14 00:23:35 +08:00
parent 506e30af24
commit af3e85e11b

123
main.py
View File

@@ -4,7 +4,7 @@ import json
import os import os
import re import re
import traceback import traceback
import requests import httpx
import subprocess import subprocess
import time import time
import shutil import shutil
@@ -14,7 +14,7 @@ from sys import stdout
from termcolor import colored, RESET from termcolor import colored, RESET
from datetime import datetime from datetime import datetime
from func_timeout import func_set_timeout, FunctionTimedOut from func_timeout import func_set_timeout, FunctionTimedOut
from requests.adapters import HTTPAdapter
dt = datetime.now() dt = datetime.now()
# Channel Group Source Link Description # Channel Group Source Link Description
# Description 应当对该源的已知参数进行标注如码率HDR # Description 应当对该源的已知参数进行标注如码率HDR
@@ -47,70 +47,67 @@ def get_stream(num, clist, uri):
def check_channel(clist, num): def check_channel(clist, num):
# clist 为一行 csv # clist 为一行 csv
uri = clist[3] uri = clist[3]
requests.adapters.DEFAULT_RETRIES = 3
try: try:
r = requests.get(clist[3], timeout=1) # 先测能不能正常访问 with httpx.Client(timeout=0.5) as client:
if (r.status_code == requests.codes.ok): ReqStatus = False
# ffprobe = FFprobe(inputs={uri: '-v warning'}) try:
# errors = tuple(filter( r = client.get(clist[3], follow_redirects=True)
# lambda line: not (line in ('', RESET) or any(regex.search(line) for regex in SKIP_FFPROBE_MESSAGES)), if r.status_code == 200:
# ffprobe.run(stderr=PIPE)[1].decode('utf-8').split('\n') ReqStatus = True
# )) except httpx.UnsupportedProtocol:
# if errors: # https://github.com/Jamim/iptv-checker/blob/master/iptv-checker.py#L26 ReqStatus = True
# print('[{}] {}({}) Error:{}'.format(str(num), clist[0], clist[2], str(errors))) if ReqStatus:
# return False cdata = get_stream(num, clist, uri)
# else: # 查视频信息 if cdata:
cdata = get_stream(num, clist, uri) flagAudio = 0
if cdata: flagVideo = 0
flagAudio = 0 flagHDR = 0
flagVideo = 0 flagHEVC = 0
flagHDR = 0 vwidth = 0
flagHEVC = 0 vheight = 0
vwidth = 0 for i in cdata['streams']:
vheight = 0 if i['codec_type'] == 'video':
for i in cdata['streams']: flagVideo = 1
if i['codec_type'] == 'video': if 'color_space' in i:
flagVideo = 1 # https://www.reddit.com/r/ffmpeg/comments/kjwxm9/how_to_detect_if_video_is_hdr_or_sdr_batch_script/
if 'color_space' in i: if 'bt2020' in i['color_space']:
# https://www.reddit.com/r/ffmpeg/comments/kjwxm9/how_to_detect_if_video_is_hdr_or_sdr_batch_script/ flagHDR = 1
if 'bt2020' in i['color_space']: if i['codec_name'] == 'hevc':
flagHDR = 1 flagHEVC = 1
if i['codec_name'] == 'hevc': if vwidth <= i['coded_width']: # 取最高分辨率
flagHEVC = 1 vwidth = i['coded_width']
if vwidth <= i['coded_width']: # 取最高分辨率 vheight = i['coded_height']
vwidth = i['coded_width'] elif i['codec_type'] == 'audio':
vheight = i['coded_height'] flagAudio = 1
elif i['codec_type'] == 'audio': if flagAudio == 0:
flagAudio = 1 print('[{}] {}({}) Error: Video Only!'.format(
if flagAudio == 0: str(num), clist[0], clist[2]))
print('[{}] {}({}) Error: Video Only!'.format( return False
str(num), clist[0], clist[2])) if flagVideo == 0:
return False print('[{}] {}({}) Error: Audio Only!'.format(
if flagVideo == 0: str(num), clist[0], clist[2]))
print('[{}] {}({}) Error: Audio Only!'.format( return False
str(num), clist[0], clist[2])) if (vwidth == 0) or (vheight == 0):
return False print('[{}] {}({}) Error: {}x{}'.format(
if (vwidth == 0) or (vheight == 0): str(num), clist[0], clist[2], vwidth, vheight))
print('[{}] {}({}) Error: {}x{}'.format(
str(num), clist[0], clist[2], vwidth, vheight))
if flagHDR == 0: if flagHDR == 0:
print('[{}] {}({}) PASS: {}*{}'.format(str(num), print('[{}] {}({}) PASS: {}*{}'.format(str(num),
clist[0], clist[2], vwidth, vheight)) clist[0], clist[2], vwidth, vheight))
return [vwidth, vheight, ''] return [vwidth, vheight, '']
if flagHDR == 1: if flagHDR == 1:
print('[{}] {}({}) PASS(HDR Enabled): {}*{}'.format(str(num), print('[{}] {}({}) PASS(HDR Enabled): {}*{}'.format(str(num),
clist[0], clist[2], vwidth, vheight)) clist[0], clist[2], vwidth, vheight))
return [vwidth, vheight, 'HDR'] return [vwidth, vheight, 'HDR']
if flagHEVC == 1: # https://news.ycombinator.com/item?id=19389496 默认有HDR的算HEVC if flagHEVC == 1: # https://news.ycombinator.com/item?id=19389496 默认有HDR的算HEVC
print('[{}] {}({}) PASS(HEVC Enabled): {}*{}'.format(str(num), print('[{}] {}({}) PASS(HEVC Enabled): {}*{}'.format(str(num),
clist[0], clist[2], vwidth, vheight)) clist[0], clist[2], vwidth, vheight))
return [vwidth, vheight, 'HEVC'] return [vwidth, vheight, 'HEVC']
else:
return False
else: else:
return False print('[{}] {}({}) Error:{}'.format(
else: str(num), clist[0], clist[2], str(r.status_code)))
print('[{}] {}({}) Error:{}'.format(
str(num), clist[0], clist[2], str(r.status_code)))
return False return False
except Exception as e: except Exception as e:
# traceback.print_exc() # traceback.print_exc()