[youtube] Sanity check chapters (and refactor related code)

Closes #520
This commit is contained in:
pukkandan 2021-07-20 05:32:41 +05:30
parent 3f698246b2
commit 7c365c2109
No known key found for this signature in database
GPG key ID: 0F00D95A001F4698

View file

@ -48,6 +48,7 @@
smuggle_url, smuggle_url,
str_or_none, str_or_none,
str_to_int, str_to_int,
traverse_obj,
try_get, try_get,
unescapeHTML, unescapeHTML,
unified_strdate, unified_strdate,
@ -56,7 +57,7 @@
url_or_none, url_or_none,
urlencode_postdata, urlencode_postdata,
urljoin, urljoin,
variadic variadic,
) )
@ -1930,44 +1931,56 @@ def extract_id(cls, url):
video_id = mobj.group(2) video_id = mobj.group(2)
return video_id return video_id
def _extract_chapters_from_json(self, data, video_id, duration): def _extract_chapters_from_json(self, data, duration):
chapters_list = try_get( chapter_list = traverse_obj(
data, data, (
lambda x: x['playerOverlays'] 'playerOverlays', 'playerOverlayRenderer', 'decoratedPlayerBarRenderer',
['playerOverlayRenderer'] 'decoratedPlayerBarRenderer', 'playerBar', 'chapteredPlayerBarRenderer', 'chapters'
['decoratedPlayerBarRenderer'] ), expected_type=list)
['decoratedPlayerBarRenderer']
['playerBar']
['chapteredPlayerBarRenderer']
['chapters'],
list)
if not chapters_list:
return
def chapter_time(chapter): return self._extract_chapters(
return float_or_none( chapter_list,
try_get( chapter_time=lambda chapter: float_or_none(
chapter, traverse_obj(chapter, ('chapterRenderer', 'timeRangeStartMillis')), scale=1000),
lambda x: x['chapterRenderer']['timeRangeStartMillis'], chapter_title=lambda chapter: traverse_obj(
int), chapter, ('chapterRenderer', 'title', 'simpleText'), expected_type=str),
scale=1000) duration=duration)
def _extract_chapters_from_engagement_panel(self, data, duration):
content_list = traverse_obj(
data,
('engagementPanels', ..., 'engagementPanelSectionListRenderer', 'content', 'macroMarkersListRenderer', 'contents'),
expected_type=list)
chapter_time = lambda chapter: parse_duration(self._get_text(chapter.get('timeDescription')))
chapter_title = lambda chapter: self._get_text(chapter.get('title'))
return next((
filter(None, (
self._extract_chapters(
traverse_obj(contents, (..., 'macroMarkersListItemRenderer')),
chapter_time, chapter_title, duration)
for contents in content_list
))), [])
def _extract_chapters(self, chapter_list, chapter_time, chapter_title, duration):
chapters = [] chapters = []
for next_num, chapter in enumerate(chapters_list, start=1): last_chapter = {'start_time': 0}
for idx, chapter in enumerate(chapter_list or []):
title = chapter_title(chapter)
start_time = chapter_time(chapter) start_time = chapter_time(chapter)
if start_time is None: if start_time is None:
continue continue
end_time = (chapter_time(chapters_list[next_num]) last_chapter['end_time'] = start_time
if next_num < len(chapters_list) else duration) if start_time < last_chapter['start_time']:
if end_time is None: if idx == 1:
continue chapters.pop()
title = try_get( self.report_warning('Invalid start time for chapter "%s"' % last_chapter['title'])
chapter, lambda x: x['chapterRenderer']['title']['simpleText'], else:
compat_str) self.report_warning(f'Invalid start time for chapter "{title}"')
chapters.append({ continue
'start_time': start_time, last_chapter = {'start_time': start_time, 'title': title}
'end_time': end_time, chapters.append(last_chapter)
'title': title, last_chapter['end_time'] = duration
})
return chapters return chapters
def _extract_yt_initial_variable(self, webpage, regex, video_id, name): def _extract_yt_initial_variable(self, webpage, regex, video_id, name):
@ -2830,38 +2843,10 @@ def process_language(container, base_url, lang_code, sub_name, query):
pass pass
if initial_data: if initial_data:
chapters = self._extract_chapters_from_json( info['chapters'] = (
initial_data, video_id, duration) self._extract_chapters_from_json(initial_data, duration)
if not chapters: or self._extract_chapters_from_engagement_panel(initial_data, duration)
for engagment_pannel in (initial_data.get('engagementPanels') or []): or None)
contents = try_get(
engagment_pannel, lambda x: x['engagementPanelSectionListRenderer']['content']['macroMarkersListRenderer']['contents'],
list)
if not contents:
continue
def chapter_time(mmlir):
return parse_duration(
self._get_text(mmlir.get('timeDescription')))
chapters = []
for next_num, content in enumerate(contents, start=1):
mmlir = content.get('macroMarkersListItemRenderer') or {}
start_time = chapter_time(mmlir)
end_time = chapter_time(try_get(
contents, lambda x: x[next_num]['macroMarkersListItemRenderer'])) \
if next_num < len(contents) else duration
if start_time is None or end_time is None:
continue
chapters.append({
'start_time': start_time,
'end_time': end_time,
'title': self._get_text(mmlir.get('title')),
})
if chapters:
break
if chapters:
info['chapters'] = chapters
contents = try_get( contents = try_get(
initial_data, initial_data,