[youtube] Simplify _get_text early

This commit is contained in:
pukkandan 2021-07-24 06:16:46 +05:30
parent cb89cfc14b
commit 052e135029
No known key found for this signature in database
GPG key ID: 0F00D95A001F4698
2 changed files with 38 additions and 38 deletions

View file

@ -1,4 +1,5 @@
{ {
"check_formats": false,
"consoletitle": false, "consoletitle": false,
"continuedl": true, "continuedl": true,
"forcedescription": false, "forcedescription": false,

View file

@ -691,7 +691,7 @@ def _extract_alerts(cls, data):
alert_type = alert.get('type') alert_type = alert.get('type')
if not alert_type: if not alert_type:
continue continue
message = cls._get_text(alert.get('text')) message = cls._get_text(alert, 'text')
if message: if message:
yield alert_type, message yield alert_type, message
@ -721,23 +721,26 @@ def _extract_badges(self, renderer: dict):
return badges return badges
@staticmethod @staticmethod
def _get_text(data, getter=None, max_runs=None): def _get_text(data, *path_list, max_runs=None):
for get in variadic(getter): for path in path_list or [None]:
d = try_get(data, get) if get is not None else data if path is None:
text = try_get(d, lambda x: x['simpleText'], compat_str) obj = [data]
if text: else:
return text obj = traverse_obj(data, path, default=[])
runs = try_get(d, lambda x: x['runs'], list) or [] if not any(key is ... or isinstance(key, (list, tuple)) for key in variadic(path)):
if not runs and isinstance(d, list): obj = [obj]
runs = d for item in obj:
text = try_get(item, lambda x: x['simpleText'], compat_str)
if text:
return text
runs = try_get(item, lambda x: x['runs'], list) or []
if not runs and isinstance(item, list):
runs = item
def get_runs(runs): runs = runs[:min(len(runs), max_runs or len(runs))]
for run in runs[:min(len(runs), max_runs or len(runs))]: text = ''.join(traverse_obj(runs, (..., 'text'), expected_type=str, default=[]))
yield try_get(run, lambda x: x['text'], compat_str) or '' if text:
return text
text = ''.join(get_runs(runs))
if text:
return text
def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None, def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None,
ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None, ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None,
@ -804,15 +807,15 @@ def is_music_url(url):
def _extract_video(self, renderer): def _extract_video(self, renderer):
video_id = renderer.get('videoId') video_id = renderer.get('videoId')
title = self._get_text(renderer.get('title')) title = self._get_text(renderer, 'title')
description = self._get_text(renderer.get('descriptionSnippet')) description = self._get_text(renderer, 'descriptionSnippet')
duration = parse_duration(self._get_text(renderer.get('lengthText'))) duration = parse_duration(self._get_text(renderer, 'lengthText'))
view_count_text = self._get_text(renderer.get('viewCountText')) or '' view_count_text = self._get_text(renderer, 'viewCountText') or ''
view_count = str_to_int(self._search_regex( view_count = str_to_int(self._search_regex(
r'^([\d,]+)', re.sub(r'\s', '', view_count_text), r'^([\d,]+)', re.sub(r'\s', '', view_count_text),
'view count', default=None)) 'view count', default=None))
uploader = self._get_text(renderer, (lambda x: x['ownerText'], lambda x: x['shortBylineText'])) uploader = self._get_text(renderer, 'ownerText', 'shortBylineText')
return { return {
'_type': 'url', '_type': 'url',
@ -2028,8 +2031,8 @@ def _extract_chapters_from_engagement_panel(self, data, duration):
data, data,
('engagementPanels', ..., 'engagementPanelSectionListRenderer', 'content', 'macroMarkersListRenderer', 'contents'), ('engagementPanels', ..., 'engagementPanelSectionListRenderer', 'content', 'macroMarkersListRenderer', 'contents'),
expected_type=list, default=[]) expected_type=list, default=[])
chapter_time = lambda chapter: parse_duration(self._get_text(chapter.get('timeDescription'))) chapter_time = lambda chapter: parse_duration(self._get_text(chapter, 'timeDescription'))
chapter_title = lambda chapter: self._get_text(chapter.get('title')) chapter_title = lambda chapter: self._get_text(chapter, 'title')
return next(( return next((
filter(None, ( filter(None, (
@ -2083,14 +2086,14 @@ def _extract_comment(self, comment_renderer, parent=None):
if not comment_id: if not comment_id:
return return
text = self._get_text(comment_renderer.get('contentText')) text = self._get_text(comment_renderer, 'contentText')
# note: timestamp is an estimate calculated from the current time and time_text # note: timestamp is an estimate calculated from the current time and time_text
time_text = self._get_text(comment_renderer.get('publishedTimeText')) or '' time_text = self._get_text(comment_renderer, 'publishedTimeText') or ''
time_text_dt = self.parse_time_text(time_text) time_text_dt = self.parse_time_text(time_text)
if isinstance(time_text_dt, datetime.datetime): if isinstance(time_text_dt, datetime.datetime):
timestamp = calendar.timegm(time_text_dt.timetuple()) timestamp = calendar.timegm(time_text_dt.timetuple())
author = self._get_text(comment_renderer.get('authorText')) author = self._get_text(comment_renderer, 'authorText')
author_id = try_get(comment_renderer, author_id = try_get(comment_renderer,
lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str) lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str)
@ -2125,7 +2128,7 @@ def extract_header(contents):
for content in contents: for content in contents:
comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer']) comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer'])
expected_comment_count = parse_count(self._get_text( expected_comment_count = parse_count(self._get_text(
comments_header_renderer, (lambda x: x['countText'], lambda x: x['commentsCount']), max_runs=1)) comments_header_renderer, 'countText', 'commentsCount', max_runs=1))
if expected_comment_count: if expected_comment_count:
comment_counts[1] = expected_comment_count comment_counts[1] = expected_comment_count
@ -3001,10 +3004,7 @@ def process_language(container, base_url, lang_code, sub_name, query):
}) })
vsir = content.get('videoSecondaryInfoRenderer') vsir = content.get('videoSecondaryInfoRenderer')
if vsir: if vsir:
info['channel'] = self._get_text(try_get( info['channel'] = self._get_text(vsir, ('owner', 'videoOwnerRenderer', 'title'))
vsir,
lambda x: x['owner']['videoOwnerRenderer']['title'],
dict))
rows = try_get( rows = try_get(
vsir, vsir,
lambda x: x['metadataRowContainer']['metadataRowContainerRenderer']['rows'], lambda x: x['metadataRowContainer']['metadataRowContainerRenderer']['rows'],
@ -3019,8 +3019,8 @@ def process_language(container, base_url, lang_code, sub_name, query):
mrr_title = mrr.get('title') mrr_title = mrr.get('title')
if not mrr_title: if not mrr_title:
continue continue
mrr_title = self._get_text(mrr['title']) mrr_title = self._get_text(mrr, 'title')
mrr_contents_text = self._get_text(mrr['contents'][0]) mrr_contents_text = self._get_text(mrr, ('contents', 0))
if mrr_title == 'License': if mrr_title == 'License':
info['license'] = mrr_contents_text info['license'] = mrr_contents_text
elif not multiple_songs: elif not multiple_songs:
@ -3592,7 +3592,7 @@ def _grid_entries(self, grid_renderer):
renderer = self._extract_basic_item_renderer(item) renderer = self._extract_basic_item_renderer(item)
if not isinstance(renderer, dict): if not isinstance(renderer, dict):
continue continue
title = self._get_text(renderer.get('title')) title = self._get_text(renderer, 'title')
# playlist # playlist
playlist_id = renderer.get('playlistId') playlist_id = renderer.get('playlistId')
@ -3652,7 +3652,7 @@ def _shelf_entries(self, shelf_renderer, skip_channels=False):
# will not work # will not work
if skip_channels and '/channels?' in shelf_url: if skip_channels and '/channels?' in shelf_url:
return return
title = self._get_text(shelf_renderer, lambda x: x['title']) title = self._get_text(shelf_renderer, 'title')
yield self.url_result(shelf_url, video_title=title) yield self.url_result(shelf_url, video_title=title)
# Shelf may not contain shelf URL, fallback to extraction from content # Shelf may not contain shelf URL, fallback to extraction from content
for entry in self._shelf_entries_from_content(shelf_renderer): for entry in self._shelf_entries_from_content(shelf_renderer):
@ -4026,8 +4026,7 @@ def _extract_availability(self, data):
renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False
if not is_selected: if not is_selected:
continue continue
label = self._get_text( label = self._get_text(renderer_dict, ('privacyDropdownItemRenderer', 'label'))
try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label'], dict) or [])
if label: if label:
badge_labels.add(label.lower()) badge_labels.add(label.lower())
break break