[ie, cleanup] No from stdlib imports in extractors (#8978)

This commit is contained in:
pukkandan 2024-04-01 05:31:09 +05:30 committed by GitHub
parent a25a424323
commit e3a3ed8a98
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 56 additions and 61 deletions

View file

@ -1,5 +1,5 @@
import functools
import re
from functools import partial
from .common import InfoExtractor
from ..utils import (
@ -349,7 +349,7 @@ def _extract_episode_info(self, title):
r'(?P<title>.*)',
]
return traverse_obj(patterns, (..., {partial(re.match, string=title)}, {
return traverse_obj(patterns, (..., {functools.partial(re.match, string=title)}, {
'season_number': ('season_number', {int_or_none}),
'episode_number': ('episode_number', {int_or_none}),
'episode': ((

View file

@ -1,4 +1,4 @@
from functools import partial
import functools
from .common import InfoExtractor
from ..utils import (
@ -50,7 +50,7 @@ def _extract_base_info(data):
**traverse_obj(data, {
'title': 'title',
'description': 'description',
'duration': ('duration', {partial(int_or_none, scale=1000)}),
'duration': ('duration', {functools.partial(int_or_none, scale=1000)}),
'timestamp': ('schedulingStart', {parse_iso8601}),
'season_number': 'seasonNumber',
'episode_number': 'episodeNumber',

View file

@ -1,5 +1,5 @@
import functools
import re
from functools import partial
from .common import InfoExtractor
from ..networking.exceptions import HTTPError
@ -115,9 +115,9 @@ def _real_extract(self, url):
note='Downloading metadata overlay', fatal=False,
), {
'title': (
{partial(get_element_text_and_html_by_tag, 'h3')}, 0,
{partial(re.sub, r'<span[^>]*>[^<]+</span>', '')}, {clean_html}),
'description': ({partial(get_element_text_and_html_by_tag, 'p')}, 0, {clean_html}),
{functools.partial(get_element_text_and_html_by_tag, 'h3')}, 0,
{functools.partial(re.sub, r'<span[^>]*>[^<]+</span>', '')}, {clean_html}),
'description': ({functools.partial(get_element_text_and_html_by_tag, 'p')}, 0, {clean_html}),
}))
return result

View file

@ -1,5 +1,5 @@
import json
from socket import timeout
import socket
from .common import InfoExtractor
from ..utils import (
@ -56,7 +56,7 @@ def canonical_url(h):
try:
self.to_screen('%s: Checking %s video format URL' % (video_id, format_id))
self._downloader._opener.open(video_url, timeout=5).close()
except timeout:
except socket.timeout:
self.to_screen(
'%s: %s URL is invalid, skipping' % (video_id, format_id))
continue

View file

@ -1,4 +1,4 @@
from itertools import zip_longest
import itertools
import re
from .common import InfoExtractor
@ -156,7 +156,7 @@ class LinkedInLearningIE(LinkedInLearningBaseIE):
def json2srt(self, transcript_lines, duration=None):
srt_data = ''
for line, (line_dict, next_dict) in enumerate(zip_longest(transcript_lines, transcript_lines[1:])):
for line, (line_dict, next_dict) in enumerate(itertools.zip_longest(transcript_lines, transcript_lines[1:])):
start_time, caption = line_dict['transcriptStartAt'] / 1000, line_dict['caption']
end_time = next_dict['transcriptStartAt'] / 1000 if next_dict else duration or start_time + 1
srt_data += '%d\n%s --> %s\n%s\n\n' % (line + 1, srt_subtitles_timecode(start_time),

View file

@ -1,4 +1,3 @@
from __future__ import unicode_literals
from .common import InfoExtractor
from ..utils import (
traverse_obj,

View file

@ -1,4 +1,4 @@
from base64 import b64decode
import base64
from .common import InfoExtractor
from ..utils import (
@ -81,7 +81,7 @@ def _real_extract(self, url):
'url': thumbnail_url,
}
thumb_name = url_basename(thumbnail_url)
thumb_name = str(b64decode(thumb_name + '=' * (-len(thumb_name) % 4)))
thumb_name = str(base64.b64decode(thumb_name + '=' * (-len(thumb_name) % 4)))
thumb.update(parse_resolution(thumb_name))
thumbnails.append(thumb)

View file

@ -4,8 +4,8 @@
import itertools
import json
import re
import urllib.parse
import time
from urllib.parse import parse_qs, urlparse
from .common import InfoExtractor
from ..utils import (
@ -388,7 +388,7 @@ def _extract_highlight(self, show_id, highlight_id):
def _real_extract(self, url):
show_id = self._match_id(url)
qs = parse_qs(urlparse(url).query)
qs = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
if not self._yes_playlist(show_id, qs.get('shareHightlight')):
return self._extract_highlight(show_id, qs['shareHightlight'][0])

View file

@ -1,9 +1,9 @@
import hashlib
import itertools
import json
import random
import re
import time
from hashlib import md5
from random import randint
from .common import InfoExtractor
from ..aes import aes_ecb_encrypt, pkcs7_padding
@ -34,7 +34,7 @@ def _create_eapi_cipher(self, api_path, query_body, cookies):
request_text = json.dumps({**query_body, 'header': cookies}, separators=(',', ':'))
message = f'nobody{api_path}use{request_text}md5forencrypt'.encode('latin1')
msg_digest = md5(message).hexdigest()
msg_digest = hashlib.md5(message).hexdigest()
data = pkcs7_padding(list(str.encode(
f'{api_path}-36cd479b6b5-{request_text}-36cd479b6b5-{msg_digest}')))
@ -53,7 +53,7 @@ def _download_eapi_json(self, path, video_id, query_body, headers={}, **kwargs):
'__csrf': '',
'os': 'pc',
'channel': 'undefined',
'requestId': f'{int(time.time() * 1000)}_{randint(0, 1000):04}',
'requestId': f'{int(time.time() * 1000)}_{random.randint(0, 1000):04}',
**traverse_obj(self._get_cookies(self._API_BASE), {
'MUSIC_U': ('MUSIC_U', {lambda i: i.value}),
})

View file

@ -4,8 +4,7 @@
import json
import re
import time
from urllib.parse import urlparse
import urllib.parse
from .common import InfoExtractor, SearchInfoExtractor
from ..networking import Request
@ -957,7 +956,7 @@ def _real_extract(self, url):
'frontend_id': traverse_obj(embedded_data, ('site', 'frontendId')) or '9',
})
hostname = remove_start(urlparse(urlh.url).hostname, 'sp.')
hostname = remove_start(urllib.parse.urlparse(urlh.url).hostname, 'sp.')
latency = try_get(self._configuration_arg('latency'), lambda x: x[0])
if latency not in self._KNOWN_LATENCY:
latency = 'high'

View file

@ -1,8 +1,8 @@
import calendar
import json
import datetime
import functools
from datetime import datetime, timezone
from random import random
import json
import random
from .common import InfoExtractor
from ..compat import (
@ -243,7 +243,7 @@ def _mark_watched(self, base_url, video_id, delivery_info):
invocation_id = delivery_info.get('InvocationId')
stream_id = traverse_obj(delivery_info, ('Delivery', 'Streams', ..., 'PublicID'), get_all=False, expected_type=str)
if invocation_id and stream_id and duration:
timestamp_str = f'/Date({calendar.timegm(datetime.now(timezone.utc).timetuple())}000)/'
timestamp_str = f'/Date({calendar.timegm(datetime.datetime.now(datetime.timezone.utc).timetuple())}000)/'
data = {
'streamRequests': [
{
@ -415,7 +415,7 @@ def _real_extract(self, url):
'cast': traverse_obj(delivery, ('Contributors', ..., 'DisplayName'), expected_type=lambda x: x or None),
'timestamp': session_start_time - 11640000000 if session_start_time else None,
'duration': delivery.get('Duration'),
'thumbnail': base_url + f'/Services/FrameGrabber.svc/FrameRedirect?objectId={video_id}&mode=Delivery&random={random()}',
'thumbnail': base_url + f'/Services/FrameGrabber.svc/FrameRedirect?objectId={video_id}&mode=Delivery&random={random.random()}',
'average_rating': delivery.get('AverageRating'),
'chapters': self._extract_chapters(timestamps),
'uploader': delivery.get('OwnerDisplayName') or None,

View file

@ -1,5 +1,5 @@
from uuid import uuid4
import json
import uuid
from .common import InfoExtractor
from ..utils import (
@ -51,7 +51,7 @@ def _real_extract(self, url):
}
def _call_api(self, endpoint, media_id, method, params):
rand_uuid = str(uuid4())
rand_uuid = str(uuid.uuid4())
res = self._download_json(
f'https://b2c-mobile.redefine.pl/rpc/{endpoint}/', media_id,
note=f'Downloading {method} JSON metadata',

View file

@ -1,5 +1,6 @@
import datetime
import json
from urllib.parse import unquote
import urllib.parse
from .common import InfoExtractor
from ..compat import functools
@ -114,7 +115,7 @@ def _maximum_flags(self):
cookies = self._get_cookies(self.BASE_URL)
if 'me' not in cookies:
self._download_webpage(self.BASE_URL, None, 'Refreshing verification information')
if traverse_obj(cookies, ('me', {lambda x: x.value}, {unquote}, {json.loads}, 'verified')):
if traverse_obj(cookies, ('me', {lambda x: x.value}, {urllib.parse.unquote}, {json.loads}, 'verified')):
flags |= 0b00110
return flags
@ -196,6 +197,7 @@ def _real_extract(self, url):
'like_count': ('up', {int}),
'dislike_count': ('down', {int}),
'timestamp': ('created', {int}),
'upload_date': ('created', {int}, {datetime.date.fromtimestamp}, {lambda x: x.strftime('%Y%m%d')}),
'thumbnail': ('thumb', {lambda x: urljoin('https://thumb.pr0gramm.com', x)})
}),
}

View file

@ -1,6 +1,6 @@
import hashlib
import re
from hashlib import sha1
from .common import InfoExtractor
from ..compat import compat_str
from ..utils import (
@ -42,7 +42,7 @@ def _extract_video_info(self, url, clip_id):
'Downloading protocols JSON',
headers=self.geo_verification_headers(), query={
'access_id': self._ACCESS_ID,
'client_token': sha1((raw_ct).encode()).hexdigest(),
'client_token': hashlib.sha1((raw_ct).encode()).hexdigest(),
'video_id': clip_id,
}, fatal=False, expected_status=(403,)) or {}
error = protocols.get('error') or {}
@ -53,7 +53,7 @@ def _extract_video_info(self, url, clip_id):
urls = (self._download_json(
self._V4_BASE_URL + 'urls', clip_id, 'Downloading urls JSON', query={
'access_id': self._ACCESS_ID,
'client_token': sha1((raw_ct + server_token + self._SUPPORTED_PROTOCOLS).encode()).hexdigest(),
'client_token': hashlib.sha1((raw_ct + server_token + self._SUPPORTED_PROTOCOLS).encode()).hexdigest(),
'protocols': self._SUPPORTED_PROTOCOLS,
'server_token': server_token,
'video_id': clip_id,
@ -77,7 +77,7 @@ def _extract_video_info(self, url, clip_id):
if not formats:
source_ids = [compat_str(source['id']) for source in video['sources']]
client_id = self._SALT[:2] + sha1(''.join([clip_id, self._SALT, self._TOKEN, client_location, self._SALT, self._CLIENT_NAME]).encode('utf-8')).hexdigest()
client_id = self._SALT[:2] + hashlib.sha1(''.join([clip_id, self._SALT, self._TOKEN, client_location, self._SALT, self._CLIENT_NAME]).encode('utf-8')).hexdigest()
sources = self._download_json(
'http://vas.sim-technik.de/vas/live/v2/videos/%s/sources' % clip_id,
@ -96,7 +96,7 @@ def fix_bitrate(bitrate):
return (bitrate // 1000) if bitrate % 1000 == 0 else bitrate
for source_id in source_ids:
client_id = self._SALT[:2] + sha1(''.join([self._SALT, clip_id, self._TOKEN, server_id, client_location, source_id, self._SALT, self._CLIENT_NAME]).encode('utf-8')).hexdigest()
client_id = self._SALT[:2] + hashlib.sha1(''.join([self._SALT, clip_id, self._TOKEN, server_id, client_location, source_id, self._SALT, self._CLIENT_NAME]).encode('utf-8')).hexdigest()
urls = self._download_json(
'http://vas.sim-technik.de/vas/live/v2/videos/%s/sources/url' % clip_id,
clip_id, 'Downloading urls JSON', fatal=False, query={

View file

@ -1,18 +1,14 @@
from .common import InfoExtractor
from ..utils import (
clean_html,
traverse_obj,
unescapeHTML,
)
import itertools
from urllib.parse import urlencode
import urllib.parse
from .common import InfoExtractor
from ..utils import clean_html, traverse_obj, unescapeHTML
class RadioKapitalBaseIE(InfoExtractor):
def _call_api(self, resource, video_id, note='Downloading JSON metadata', qs={}):
return self._download_json(
f'https://www.radiokapital.pl/wp-json/kapital/v1/{resource}?{urlencode(qs)}',
f'https://www.radiokapital.pl/wp-json/kapital/v1/{resource}?{urllib.parse.urlencode(qs)}',
video_id, note=note)
def _parse_episode(self, data):

View file

@ -1,8 +1,8 @@
import datetime
import itertools
import json
import re
import urllib.parse
from datetime import datetime
from .common import InfoExtractor, SearchInfoExtractor
from ..utils import (
@ -156,7 +156,7 @@ def _real_extract(self, url):
self.raise_login_required('This video is only available to premium users', True, method='cookies')
elif scheduled:
self.raise_no_formats(
f'Stream is offline; scheduled for {datetime.fromtimestamp(scheduled).strftime("%Y-%m-%d %H:%M:%S")}',
f'Stream is offline; scheduled for {datetime.datetime.fromtimestamp(scheduled).strftime("%Y-%m-%d %H:%M:%S")}',
video_id=video_id, expected=True)
uploader = traverse_obj(metadata, ('createdBy', 'username'), ('creator', 'username'))

View file

@ -1,8 +1,7 @@
from __future__ import annotations
import functools
import json
from functools import partial
from textwrap import dedent
import textwrap
from .common import InfoExtractor
from ..utils import ExtractorError, format_field, int_or_none, parse_iso8601
@ -10,7 +9,7 @@
def _fmt_url(url):
return partial(format_field, template=url, default=None)
return functools.partial(format_field, template=url, default=None)
class TelewebionIE(InfoExtractor):
@ -88,7 +87,7 @@ def _real_extract(self, url):
if not video_id.startswith('0x'):
video_id = hex(int(video_id))
episode_data = self._call_graphql_api('getEpisodeDetail', video_id, dedent('''
episode_data = self._call_graphql_api('getEpisodeDetail', video_id, textwrap.dedent('''
queryEpisode(filter: {EpisodeID: $EpisodeId}, first: 1) {
title
program {
@ -127,7 +126,7 @@ def _real_extract(self, url):
'formats': (
'channel', 'descriptor', {str},
{_fmt_url(f'https://cdna.telewebion.com/%s/episode/{video_id}/playlist.m3u8')},
{partial(self._extract_m3u8_formats, video_id=video_id, ext='mp4', m3u8_id='hls')}),
{functools.partial(self._extract_m3u8_formats, video_id=video_id, ext='mp4', m3u8_id='hls')}),
}))
info_dict['id'] = video_id
return info_dict

View file

@ -1,7 +1,7 @@
import base64
import datetime
import functools
import itertools
from datetime import datetime
from .common import InfoExtractor
from ..networking import HEADRequest
@ -70,7 +70,7 @@ def _get_bearer_token(self, video_id):
username, password = self._get_login_info()
if username is None or password is None:
self.raise_login_required('Your 10play account\'s details must be provided with --username and --password.')
_timestamp = datetime.now().strftime('%Y%m%d000000')
_timestamp = datetime.datetime.now().strftime('%Y%m%d000000')
_auth_header = base64.b64encode(_timestamp.encode('ascii')).decode('ascii')
data = self._download_json('https://10play.com.au/api/user/auth', video_id, 'Getting bearer token', headers={
'X-Network-Ten-Auth': _auth_header,

View file

@ -1,6 +1,6 @@
import base64
import re
import urllib.parse
from base64 import b64decode
from .common import InfoExtractor
from ..networking import HEADRequest
@ -371,7 +371,7 @@ def _real_extract(self, url):
webpage = self._download_webpage(f'https://fast.wistia.net/embed/channel/{channel_id}', channel_id)
data = self._parse_json(
self._search_regex(r'wchanneljsonp-%s\'\]\s*=[^\"]*\"([A-Za-z0-9=/]*)' % channel_id, webpage, 'jsonp', channel_id),
channel_id, transform_source=lambda x: urllib.parse.unquote_plus(b64decode(x).decode('utf-8')))
channel_id, transform_source=lambda x: urllib.parse.unquote_plus(base64.b64decode(x).decode('utf-8')))
# XXX: can there be more than one series?
series = traverse_obj(data, ('series', 0), default={})

View file

@ -1,5 +1,5 @@
import re
from uuid import uuid4
import uuid
from .common import InfoExtractor
from ..compat import compat_str
@ -53,7 +53,7 @@ def _initialize_pre_login(self):
self._request_webpage(
'%s/zapi/v3/session/hello' % self._host_url(), None,
'Opening session', data=urlencode_postdata({
'uuid': compat_str(uuid4()),
'uuid': compat_str(uuid.uuid4()),
'lang': 'en',
'app_version': '1.8.2',
'format': 'json',