[atv.at] Use jwt for API (#1012)

The jwt token is implemented according to RFC7519

Closes #988
Authored by: NeroBurner
This commit is contained in:
NeroBurner 2021-09-23 19:40:51 +02:00 committed by GitHub
parent ee2b3563f3
commit 49fa4d9af7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 102 additions and 49 deletions

View file

@ -1,74 +1,106 @@
# coding: utf-8 # coding: utf-8
from __future__ import unicode_literals from __future__ import unicode_literals
import datetime
from .common import InfoExtractor from .common import InfoExtractor
from ..utils import ( from ..utils import (
determine_ext, float_or_none,
dict_get, jwt_encode_hs256,
int_or_none, try_get,
unescapeHTML,
) )
class ATVAtIE(InfoExtractor): class ATVAtIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?atv\.at/(?:[^/]+/){2}(?P<id>[dv]\d+)' _VALID_URL = r'https?://(?:www\.)?atv\.at/tv/(?:[^/]+/){2,3}(?P<id>.*)'
_TESTS = [{ _TESTS = [{
'url': 'https://www.atv.at/bauer-sucht-frau-die-zweite-chance/folge-1/d3390693/', 'url': 'https://www.atv.at/tv/bauer-sucht-frau/staffel-18/bauer-sucht-frau/bauer-sucht-frau-staffel-18-folge-3-die-hofwochen',
'md5': 'c471605591009dfb6e6c54f7e62e2807', 'md5': '3c3b4aaca9f63e32b35e04a9c2515903',
'info_dict': { 'info_dict': {
'id': '3390684', 'id': 'v-ce9cgn1e70n5-1',
'ext': 'mp4', 'ext': 'mp4',
'title': 'Bauer sucht Frau - Die zweite Chance Folge 1', 'title': 'Bauer sucht Frau - Staffel 18 Folge 3 - Die Hofwochen',
} }
}, { }, {
'url': 'https://www.atv.at/bauer-sucht-frau-staffel-17/fuenfte-eventfolge/d3339537/', 'url': 'https://www.atv.at/tv/bauer-sucht-frau/staffel-18/episode-01/bauer-sucht-frau-staffel-18-vorstellungsfolge-1',
'only_matching': True, 'only_matching': True,
}] }]
def _process_source_entry(self, source, part_id): # extracted from bootstrap.js function (search for e.encryption_key and use your browser's debugger)
source_url = source.get('url') _ACCESS_ID = 'x_atv'
if not source_url: _ENCRYPTION_KEY = 'Hohnaekeishoogh2omaeghooquooshia'
return
if determine_ext(source_url) == 'm3u8':
return self._extract_m3u8_formats(
source_url, part_id, 'mp4', 'm3u8_native',
m3u8_id='hls', fatal=False)
else:
return [{
'url': source_url,
}]
def _process_entry(self, entry): def _extract_video_info(self, url, content, video):
part_id = entry.get('id') clip_id = content.get('splitId', content['id'])
if not part_id:
return
formats = [] formats = []
for source in entry.get('sources', []): clip_urls = video['urls']
formats.extend(self._process_source_entry(source, part_id) or []) for protocol, variant in clip_urls.items():
source_url = try_get(variant, lambda x: x['clear']['url'])
if not source_url:
continue
if protocol == 'dash':
formats.extend(self._extract_mpd_formats(
source_url, clip_id, mpd_id=protocol, fatal=False))
elif protocol == 'hls':
formats.extend(self._extract_m3u8_formats(
source_url, clip_id, 'mp4', 'm3u8_native',
m3u8_id=protocol, fatal=False))
else:
formats.append({
'url': source_url,
'format_id': protocol,
})
self._sort_formats(formats) self._sort_formats(formats)
return { return {
'id': part_id, 'id': clip_id,
'title': entry.get('title'), 'title': content.get('title'),
'duration': int_or_none(entry.get('duration')), 'duration': float_or_none(content.get('duration')),
'formats': formats 'series': content.get('tvShowTitle'),
'formats': formats,
} }
def _real_extract(self, url): def _real_extract(self, url):
display_id = self._match_id(url) video_id = self._match_id(url)
webpage = self._download_webpage(url, display_id) webpage = self._download_webpage(url, video_id)
video_data = self._parse_json(unescapeHTML(self._search_regex( json_data = self._parse_json(
r'var\splaylist\s*=\s*(?P<json>\[.*\]);', self._search_regex(r'<script id="state" type="text/plain">(.*)</script>', webpage, 'json_data'),
webpage, 'player data', group='json')), video_id=video_id)
display_id)
first_video = video_data[0] video_title = json_data['views']['default']['page']['title']
video_id = first_video['id'] contentResource = json_data['views']['default']['page']['contentResource']
video_title = dict_get(first_video, ('tvShowTitle', 'title')) content_id = contentResource[0]['id']
content_ids = [{'id': id, 'subclip_start': content['start'], 'subclip_end': content['end']}
for id, content in enumerate(contentResource)]
time_of_request = datetime.datetime.now()
not_before = time_of_request - datetime.timedelta(minutes=5)
expire = time_of_request + datetime.timedelta(minutes=5)
payload = {
'content_ids': {
content_id: content_ids,
},
'secure_delivery': True,
'iat': int(time_of_request.timestamp()),
'nbf': int(not_before.timestamp()),
'exp': int(expire.timestamp()),
}
jwt_token = jwt_encode_hs256(payload, self._ENCRYPTION_KEY, headers={'kid': self._ACCESS_ID})
videos = self._download_json(
'https://vas-v4.p7s1video.net/4.0/getsources',
content_id, 'Downloading videos JSON', query={
'token': jwt_token.decode('utf-8')
})
video_id, videos_data = list(videos['data'].items())[0]
entries = [
self._extract_video_info(url, contentResource[video['id']], video)
for video in videos_data]
return { return {
'_type': 'multi_video', '_type': 'multi_video',
'id': video_id, 'id': video_id,
'title': video_title, 'title': video_title,
'entries': (self._process_entry(entry) for entry in video_data), 'entries': entries,
} }

View file

@ -16,6 +16,8 @@
import errno import errno
import functools import functools
import gzip import gzip
import hashlib
import hmac
import imp import imp
import io import io
import itertools import itertools
@ -3290,6 +3292,14 @@ def platform_name():
return res return res
def get_windows_version():
''' Get Windows version. None if it's not running on Windows '''
if compat_os_name == 'nt':
return version_tuple(platform.win32_ver()[1])
else:
return None
def _windows_write_string(s, out): def _windows_write_string(s, out):
""" Returns True if the string was written using special methods, """ Returns True if the string was written using special methods,
False if it has yet to be written out.""" False if it has yet to be written out."""
@ -6375,9 +6385,20 @@ def variadic(x, allowed_types=(str, bytes)):
return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,) return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
def get_windows_version(): # create a JSON Web Signature (jws) with HS256 algorithm
''' Get Windows version. None if it's not running on Windows ''' # the resulting format is in JWS Compact Serialization
if compat_os_name == 'nt': # implemented following JWT https://www.rfc-editor.org/rfc/rfc7519.html
return version_tuple(platform.win32_ver()[1]) # implemented following JWS https://www.rfc-editor.org/rfc/rfc7515.html
else: def jwt_encode_hs256(payload_data, key, headers={}):
return None header_data = {
'alg': 'HS256',
'typ': 'JWT',
}
if headers:
header_data.update(headers)
header_b64 = base64.b64encode(json.dumps(header_data).encode('utf-8'))
payload_b64 = base64.b64encode(json.dumps(payload_data).encode('utf-8'))
h = hmac.new(key.encode('utf-8'), header_b64 + b'.' + payload_b64, hashlib.sha256)
signature_b64 = base64.b64encode(h.digest())
token = header_b64 + b'.' + payload_b64 + b'.' + signature_b64
return token