[extractor] Add _search_json

All fetching of JSON objects should eventually be done with this function
but only `youtube` is being refactored for now
This commit is contained in:
pukkandan 2022-06-03 21:02:31 +05:30
parent 00bbc5f177
commit b7c47b7438
No known key found for this signature in database
GPG key ID: 7EEE9E1E817D0A39
4 changed files with 42 additions and 39 deletions

View file

@ -442,9 +442,10 @@ class YoutubeWebArchiveIE(InfoExtractor):
'only_matching': True 'only_matching': True
}, },
] ]
_YT_INITIAL_DATA_RE = r'(?:(?:(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=\s*({.+?})\s*;)|%s)' % YoutubeBaseInfoExtractor._YT_INITIAL_DATA_RE _YT_INITIAL_DATA_RE = YoutubeBaseInfoExtractor._YT_INITIAL_DATA_RE
_YT_INITIAL_PLAYER_RESPONSE_RE = r'(?:(?:(?:window\s*\[\s*["\']ytInitialPlayerResponse["\']\s*\]|ytInitialPlayerResponse)\s*=[(\s]*({.+?})[)\s]*;)|%s)' % YoutubeBaseInfoExtractor._YT_INITIAL_PLAYER_RESPONSE_RE _YT_INITIAL_PLAYER_RESPONSE_RE = fr'''(?x)
_YT_INITIAL_BOUNDARY_RE = r'(?:(?:var\s+meta|</script|\n)|%s)' % YoutubeBaseInfoExtractor._YT_INITIAL_BOUNDARY_RE (?:window\s*\[\s*["\']ytInitialPlayerResponse["\']\s*\]|ytInitialPlayerResponse)\s*=[(\s]*|
{YoutubeBaseInfoExtractor._YT_INITIAL_PLAYER_RESPONSE_RE}'''
_YT_DEFAULT_THUMB_SERVERS = ['i.ytimg.com'] # thumbnails most likely archived on these servers _YT_DEFAULT_THUMB_SERVERS = ['i.ytimg.com'] # thumbnails most likely archived on these servers
_YT_ALL_THUMB_SERVERS = orderedSet( _YT_ALL_THUMB_SERVERS = orderedSet(
@ -474,11 +475,6 @@ def _call_cdx_api(self, item_id, url, filters: list = None, collapse: list = Non
elif not isinstance(res, list) or len(res) != 0: elif not isinstance(res, list) or len(res) != 0:
self.report_warning('Error while parsing CDX API response' + bug_reports_message()) self.report_warning('Error while parsing CDX API response' + bug_reports_message())
def _extract_yt_initial_variable(self, webpage, regex, video_id, name):
return self._parse_json(self._search_regex(
(fr'{regex}\s*{self._YT_INITIAL_BOUNDARY_RE}',
regex), webpage, name, default='{}'), video_id, fatal=False)
def _extract_webpage_title(self, webpage): def _extract_webpage_title(self, webpage):
page_title = self._html_extract_title(webpage, default='') page_title = self._html_extract_title(webpage, default='')
# YouTube video pages appear to always have either 'YouTube -' as prefix or '- YouTube' as suffix. # YouTube video pages appear to always have either 'YouTube -' as prefix or '- YouTube' as suffix.
@ -488,10 +484,11 @@ def _extract_webpage_title(self, webpage):
def _extract_metadata(self, video_id, webpage): def _extract_metadata(self, video_id, webpage):
search_meta = ((lambda x: self._html_search_meta(x, webpage, default=None)) if webpage else (lambda x: None)) search_meta = ((lambda x: self._html_search_meta(x, webpage, default=None)) if webpage else (lambda x: None))
player_response = self._extract_yt_initial_variable( player_response = self._search_json(
webpage, self._YT_INITIAL_PLAYER_RESPONSE_RE, video_id, 'initial player response') or {} self._YT_INITIAL_PLAYER_RESPONSE_RE, webpage, 'initial player response',
initial_data = self._extract_yt_initial_variable( video_id, fatal=False)
webpage, self._YT_INITIAL_DATA_RE, video_id, 'initial player response') or {} initial_data = self._search_json(
self._YT_INITIAL_DATA_RE, webpage, 'initial data', video_id, fatal=False)
initial_data_video = traverse_obj( initial_data_video = traverse_obj(
initial_data, ('contents', 'twoColumnWatchNextResults', 'results', 'results', 'contents', ..., 'videoPrimaryInfoRenderer'), initial_data, ('contents', 'twoColumnWatchNextResults', 'results', 'results', 'contents', ..., 'videoPrimaryInfoRenderer'),

View file

@ -35,6 +35,7 @@
ExtractorError, ExtractorError,
GeoRestrictedError, GeoRestrictedError,
GeoUtils, GeoUtils,
LenientJSONDecoder,
RegexNotFoundError, RegexNotFoundError,
UnsupportedError, UnsupportedError,
age_restricted, age_restricted,
@ -930,19 +931,10 @@ def _parse_xml(self, xml_string, video_id, transform_source=None, fatal=True):
else: else:
self.report_warning(errmsg + str(ve)) self.report_warning(errmsg + str(ve))
def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, lenient=False): def _parse_json(self, json_string, video_id, transform_source=None, fatal=True, **parser_kwargs):
if transform_source:
json_string = transform_source(json_string)
try: try:
try: return json.loads(
return json.loads(json_string, strict=False) json_string, cls=LenientJSONDecoder, strict=False, transform_source=transform_source, **parser_kwargs)
except json.JSONDecodeError as e:
if not lenient:
raise
try:
return json.loads(json_string[:e.pos], strict=False)
except ValueError:
raise e
except ValueError as ve: except ValueError as ve:
errmsg = f'{video_id}: Failed to parse JSON' errmsg = f'{video_id}: Failed to parse JSON'
if fatal: if fatal:
@ -1196,6 +1188,14 @@ def _search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, f
self.report_warning('unable to extract %s' % _name + bug_reports_message()) self.report_warning('unable to extract %s' % _name + bug_reports_message())
return None return None
def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='', fatal=True, **kwargs):
"""Searches string for the JSON object specified by start_pattern"""
# NB: end_pattern is only used to reduce the size of the initial match
return self._parse_json(
self._search_regex(rf'{start_pattern}\s*(?P<json>{{.+}})\s*{end_pattern}',
string, name, group='json', fatal=fatal) or '{}',
video_id, fatal=fatal, ignore_extra=True, **kwargs) or {}
def _html_search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, flags=0, group=None): def _html_search_regex(self, pattern, string, name, default=NO_DEFAULT, fatal=True, flags=0, group=None):
""" """
Like _search_regex, but strips HTML tags and unescapes entities. Like _search_regex, but strips HTML tags and unescapes entities.

View file

@ -397,9 +397,8 @@ def _check_login_required(self):
if self._LOGIN_REQUIRED and not self._cookies_passed: if self._LOGIN_REQUIRED and not self._cookies_passed:
self.raise_login_required('Login details are needed to download this content', method='cookies') self.raise_login_required('Login details are needed to download this content', method='cookies')
_YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=\s*({.+})\s*;' _YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*='
_YT_INITIAL_PLAYER_RESPONSE_RE = r'ytInitialPlayerResponse\s*=\s*({.+})\s*;' _YT_INITIAL_PLAYER_RESPONSE_RE = r'ytInitialPlayerResponse\s*='
_YT_INITIAL_BOUNDARY_RE = r'(?:var\s+meta|</script|\n)'
def _get_default_ytcfg(self, client='web'): def _get_default_ytcfg(self, client='web'):
return copy.deepcopy(INNERTUBE_CLIENTS[client]) return copy.deepcopy(INNERTUBE_CLIENTS[client])
@ -476,12 +475,8 @@ def _call_api(self, ep, query, video_id, fatal=True, headers=None,
data=json.dumps(data).encode('utf8'), headers=real_headers, data=json.dumps(data).encode('utf8'), headers=real_headers,
query={'key': api_key or self._extract_api_key(), 'prettyPrint': 'false'}) query={'key': api_key or self._extract_api_key(), 'prettyPrint': 'false'})
def extract_yt_initial_data(self, item_id, webpage, fatal=True): def extract_yt_initial_data(self, item_id, webpage):
data = self._search_regex( return self._search_json(self._YT_INITIAL_DATA_RE, webpage, 'yt initial data', item_id, fatal=True)
(fr'{self._YT_INITIAL_DATA_RE}\s*{self._YT_INITIAL_BOUNDARY_RE}',
self._YT_INITIAL_DATA_RE), webpage, 'yt initial data', fatal=fatal)
if data:
return self._parse_json(data, item_id, fatal=fatal)
def _extract_yt_initial_variable(self, webpage, regex, video_id, name): def _extract_yt_initial_variable(self, webpage, regex, video_id, name):
return self._parse_json(self._search_regex( return self._parse_json(self._search_regex(
@ -3052,9 +3047,8 @@ def _get_requested_clients(self, url, smuggled_data):
def _extract_player_responses(self, clients, video_id, webpage, master_ytcfg): def _extract_player_responses(self, clients, video_id, webpage, master_ytcfg):
initial_pr = None initial_pr = None
if webpage: if webpage:
initial_pr = self._extract_yt_initial_variable( initial_pr = self._search_json(
webpage, self._YT_INITIAL_PLAYER_RESPONSE_RE, self._YT_INITIAL_PLAYER_RESPONSE_RE, webpage, 'initial player response', video_id, fatal=False)
video_id, 'initial player response')
all_clients = set(clients) all_clients = set(clients)
clients = clients[::-1] clients = clients[::-1]
@ -3678,9 +3672,8 @@ def process_language(container, base_url, lang_code, sub_name, query):
initial_data = None initial_data = None
if webpage: if webpage:
initial_data = self._extract_yt_initial_variable( initial_data = self._search_json(
webpage, self._YT_INITIAL_DATA_RE, video_id, self._YT_INITIAL_DATA_RE, webpage, 'yt initial data', video_id, fatal=False)
'yt initial data')
if not initial_data: if not initial_data:
query = {'videoId': video_id} query = {'videoId': video_id}
query.update(self._get_checkok_params()) query.update(self._get_checkok_params())

View file

@ -594,6 +594,19 @@ def clean_html(html):
return html.strip() return html.strip()
class LenientJSONDecoder(json.JSONDecoder):
def __init__(self, *args, transform_source=None, ignore_extra=False, **kwargs):
self.transform_source, self.ignore_extra = transform_source, ignore_extra
super().__init__(*args, **kwargs)
def decode(self, s):
if self.transform_source:
s = self.transform_source(s)
if self.ignore_extra:
return self.raw_decode(s.lstrip())[0]
return super().decode(s)
def sanitize_open(filename, open_mode): def sanitize_open(filename, open_mode):
"""Try to open the given filename, and slightly tweak it if this fails. """Try to open the given filename, and slightly tweak it if this fails.