mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2024-11-07 20:30:41 -05:00
[dependencies] Standardize Cryptodome
imports
This commit is contained in:
parent
754c84e2e4
commit
f6a765ceb5
9 changed files with 74 additions and 69 deletions
|
@ -26,7 +26,7 @@
|
||||||
key_expansion,
|
key_expansion,
|
||||||
pad_block,
|
pad_block,
|
||||||
)
|
)
|
||||||
from yt_dlp.dependencies import Cryptodome_AES
|
from yt_dlp.dependencies import Cryptodome
|
||||||
from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes
|
from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes
|
||||||
|
|
||||||
# the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
|
# the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
|
||||||
|
@ -48,7 +48,7 @@ def test_cbc_decrypt(self):
|
||||||
data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
|
data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
|
||||||
decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
|
decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
if Cryptodome_AES:
|
if Cryptodome:
|
||||||
decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
|
decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ def test_gcm_decrypt(self):
|
||||||
decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
|
decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
|
||||||
bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
|
bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
if Cryptodome_AES:
|
if Cryptodome:
|
||||||
decrypted = aes_gcm_decrypt_and_verify_bytes(
|
decrypted = aes_gcm_decrypt_and_verify_bytes(
|
||||||
data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
|
data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
|
|
|
@ -31,6 +31,9 @@ def test_compat_passthrough(self):
|
||||||
# TODO: Test submodule
|
# TODO: Test submodule
|
||||||
# compat.asyncio.events # Must not raise error
|
# compat.asyncio.events # Must not raise error
|
||||||
|
|
||||||
|
with self.assertWarns(DeprecationWarning):
|
||||||
|
compat.compat_pycrypto_AES # Must not raise error
|
||||||
|
|
||||||
def test_compat_expanduser(self):
|
def test_compat_expanduser(self):
|
||||||
old_home = os.environ.get('HOME')
|
old_home = os.environ.get('HOME')
|
||||||
test_str = R'C:\Documents and Settings\тест\Application Data'
|
test_str = R'C:\Documents and Settings\тест\Application Data'
|
||||||
|
|
|
@ -2,17 +2,17 @@
|
||||||
from math import ceil
|
from math import ceil
|
||||||
|
|
||||||
from .compat import compat_ord
|
from .compat import compat_ord
|
||||||
from .dependencies import Cryptodome_AES
|
from .dependencies import Cryptodome
|
||||||
from .utils import bytes_to_intlist, intlist_to_bytes
|
from .utils import bytes_to_intlist, intlist_to_bytes
|
||||||
|
|
||||||
if Cryptodome_AES:
|
if Cryptodome:
|
||||||
def aes_cbc_decrypt_bytes(data, key, iv):
|
def aes_cbc_decrypt_bytes(data, key, iv):
|
||||||
""" Decrypt bytes with AES-CBC using pycryptodome """
|
""" Decrypt bytes with AES-CBC using pycryptodome """
|
||||||
return Cryptodome_AES.new(key, Cryptodome_AES.MODE_CBC, iv).decrypt(data)
|
return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_CBC, iv).decrypt(data)
|
||||||
|
|
||||||
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
|
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
|
||||||
""" Decrypt bytes with AES-GCM using pycryptodome """
|
""" Decrypt bytes with AES-GCM using pycryptodome """
|
||||||
return Cryptodome_AES.new(key, Cryptodome_AES.MODE_GCM, nonce).decrypt_and_verify(data, tag)
|
return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_GCM, nonce).decrypt_and_verify(data, tag)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
def aes_cbc_decrypt_bytes(data, key, iv):
|
def aes_cbc_decrypt_bytes(data, key, iv):
|
||||||
|
|
|
@ -10,16 +10,12 @@
|
||||||
|
|
||||||
|
|
||||||
def get_package_info(module):
|
def get_package_info(module):
|
||||||
parent = module.__name__.split('.')[0]
|
return _Package(
|
||||||
parent_module = None
|
name=getattr(module, '_yt_dlp__identifier', module.__name__),
|
||||||
with contextlib.suppress(ImportError):
|
version=str(next(filter(None, (
|
||||||
parent_module = importlib.import_module(parent)
|
getattr(module, attr, None)
|
||||||
|
for attr in ('__version__', 'version_string', 'version')
|
||||||
for attr in ('__version__', 'version_string', 'version'):
|
)), None)))
|
||||||
version = getattr(parent_module, attr, None)
|
|
||||||
if version is not None:
|
|
||||||
break
|
|
||||||
return _Package(getattr(module, '_yt_dlp__identifier', parent), str(version))
|
|
||||||
|
|
||||||
|
|
||||||
def _is_package(module):
|
def _is_package(module):
|
||||||
|
|
38
yt_dlp/dependencies/Cryptodome.py
Normal file
38
yt_dlp/dependencies/Cryptodome.py
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
import importlib
|
||||||
|
|
||||||
|
from ..compat import functools
|
||||||
|
from ..compat.compat_utils import EnhancedModule, passthrough_module
|
||||||
|
|
||||||
|
EnhancedModule(__name__)
|
||||||
|
|
||||||
|
try:
|
||||||
|
import Cryptodome as _parent
|
||||||
|
except ImportError:
|
||||||
|
try:
|
||||||
|
import Crypto as _parent
|
||||||
|
except (ImportError, SyntaxError): # Old Crypto gives SyntaxError in newer Python
|
||||||
|
_parent = EnhancedModule('Cryptodome')
|
||||||
|
__bool__ = lambda: False
|
||||||
|
|
||||||
|
|
||||||
|
@functools.cache
|
||||||
|
def __getattr__(name):
|
||||||
|
try:
|
||||||
|
submodule = importlib.import_module(f'.{name}', _parent.__name__)
|
||||||
|
except ImportError:
|
||||||
|
return getattr(_parent, name)
|
||||||
|
return passthrough_module(f'{__name__}.{name}', submodule)
|
||||||
|
|
||||||
|
|
||||||
|
@property
|
||||||
|
@functools.cache
|
||||||
|
def _yt_dlp__identifier():
|
||||||
|
if _parent.__name__ == 'Crypto':
|
||||||
|
from Crypto.Cipher import AES
|
||||||
|
try:
|
||||||
|
# In pycrypto, mode defaults to ECB. See:
|
||||||
|
# https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode
|
||||||
|
AES.new(b'abcdefghijklmnop')
|
||||||
|
except TypeError:
|
||||||
|
return 'pycrypto'
|
||||||
|
return _parent.__name__
|
|
@ -23,24 +23,6 @@
|
||||||
certifi = None
|
certifi = None
|
||||||
|
|
||||||
|
|
||||||
try:
|
|
||||||
from Cryptodome.Cipher import AES as Cryptodome_AES
|
|
||||||
except ImportError:
|
|
||||||
try:
|
|
||||||
from Crypto.Cipher import AES as Cryptodome_AES
|
|
||||||
except (ImportError, SyntaxError): # Old Crypto gives SyntaxError in newer Python
|
|
||||||
Cryptodome_AES = None
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
# In pycrypto, mode defaults to ECB. See:
|
|
||||||
# https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode
|
|
||||||
Cryptodome_AES.new(b'abcdefghijklmnop')
|
|
||||||
except TypeError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
Cryptodome_AES._yt_dlp__identifier = 'pycrypto'
|
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import mutagen
|
import mutagen
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
@ -84,12 +66,16 @@
|
||||||
xattr._yt_dlp__identifier = 'pyxattr'
|
xattr._yt_dlp__identifier = 'pyxattr'
|
||||||
|
|
||||||
|
|
||||||
|
from . import Cryptodome
|
||||||
|
|
||||||
all_dependencies = {k: v for k, v in globals().items() if not k.startswith('_')}
|
all_dependencies = {k: v for k, v in globals().items() if not k.startswith('_')}
|
||||||
|
|
||||||
|
|
||||||
available_dependencies = {k: v for k, v in all_dependencies.items() if v}
|
available_dependencies = {k: v for k, v in all_dependencies.items() if v}
|
||||||
|
|
||||||
|
|
||||||
|
# Deprecated
|
||||||
|
Cryptodome_AES = Cryptodome.Cipher.AES if Cryptodome else None
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'all_dependencies',
|
'all_dependencies',
|
||||||
'available_dependencies',
|
'available_dependencies',
|
|
@ -7,7 +7,7 @@
|
||||||
from .external import FFmpegFD
|
from .external import FFmpegFD
|
||||||
from .fragment import FragmentFD
|
from .fragment import FragmentFD
|
||||||
from .. import webvtt
|
from .. import webvtt
|
||||||
from ..dependencies import Cryptodome_AES
|
from ..dependencies import Cryptodome
|
||||||
from ..utils import bug_reports_message, parse_m3u8_attributes, update_url_query
|
from ..utils import bug_reports_message, parse_m3u8_attributes, update_url_query
|
||||||
|
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ def real_download(self, filename, info_dict):
|
||||||
can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None
|
can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None
|
||||||
if can_download:
|
if can_download:
|
||||||
has_ffmpeg = FFmpegFD.available()
|
has_ffmpeg = FFmpegFD.available()
|
||||||
no_crypto = not Cryptodome_AES and '#EXT-X-KEY:METHOD=AES-128' in s
|
no_crypto = not Cryptodome and '#EXT-X-KEY:METHOD=AES-128' in s
|
||||||
if no_crypto and has_ffmpeg:
|
if no_crypto and has_ffmpeg:
|
||||||
can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available'
|
can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available'
|
||||||
elif no_crypto:
|
elif no_crypto:
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
|
|
||||||
from .common import InfoExtractor, SearchInfoExtractor
|
from .common import InfoExtractor, SearchInfoExtractor
|
||||||
|
from ..dependencies import Cryptodome
|
||||||
from ..utils import (
|
from ..utils import (
|
||||||
ExtractorError,
|
ExtractorError,
|
||||||
GeoRestrictedError,
|
GeoRestrictedError,
|
||||||
|
@ -893,22 +894,15 @@ def _parse_video_metadata(self, video_data):
|
||||||
}
|
}
|
||||||
|
|
||||||
def _perform_login(self, username, password):
|
def _perform_login(self, username, password):
|
||||||
try:
|
if not Cryptodome:
|
||||||
from Cryptodome.PublicKey import RSA
|
|
||||||
from Cryptodome.Cipher import PKCS1_v1_5
|
|
||||||
except ImportError:
|
|
||||||
try:
|
|
||||||
from Crypto.PublicKey import RSA
|
|
||||||
from Crypto.Cipher import PKCS1_v1_5
|
|
||||||
except ImportError:
|
|
||||||
raise ExtractorError('pycryptodomex not found. Please install', expected=True)
|
raise ExtractorError('pycryptodomex not found. Please install', expected=True)
|
||||||
|
|
||||||
key_data = self._download_json(
|
key_data = self._download_json(
|
||||||
'https://passport.bilibili.tv/x/intl/passport-login/web/key?lang=en-US', None,
|
'https://passport.bilibili.tv/x/intl/passport-login/web/key?lang=en-US', None,
|
||||||
note='Downloading login key', errnote='Unable to download login key')['data']
|
note='Downloading login key', errnote='Unable to download login key')['data']
|
||||||
|
|
||||||
public_key = RSA.importKey(key_data['key'])
|
public_key = Cryptodome.PublicKey.RSA.importKey(key_data['key'])
|
||||||
password_hash = PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8'))
|
password_hash = Cryptodome.Cipher.PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8'))
|
||||||
login_post = self._download_json(
|
login_post = self._download_json(
|
||||||
'https://passport.bilibili.tv/x/intl/passport-login/web/login/password?lang=en-US', None, data=urlencode_postdata({
|
'https://passport.bilibili.tv/x/intl/passport-login/web/login/password?lang=en-US', None, data=urlencode_postdata({
|
||||||
'username': username,
|
'username': username,
|
||||||
|
|
|
@ -2,11 +2,8 @@
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from .common import InfoExtractor
|
from .common import InfoExtractor
|
||||||
from ..utils import (
|
from ..dependencies import Cryptodome
|
||||||
ExtractorError,
|
from ..utils import ExtractorError, int_or_none, qualities
|
||||||
int_or_none,
|
|
||||||
qualities,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class IviIE(InfoExtractor):
|
class IviIE(InfoExtractor):
|
||||||
|
@ -94,17 +91,7 @@ def _real_extract(self, url):
|
||||||
for site in (353, 183):
|
for site in (353, 183):
|
||||||
content_data = (data % site).encode()
|
content_data = (data % site).encode()
|
||||||
if site == 353:
|
if site == 353:
|
||||||
try:
|
if not Cryptodome:
|
||||||
from Cryptodome.Cipher import Blowfish
|
|
||||||
from Cryptodome.Hash import CMAC
|
|
||||||
pycryptodome_found = True
|
|
||||||
except ImportError:
|
|
||||||
try:
|
|
||||||
from Crypto.Cipher import Blowfish
|
|
||||||
from Crypto.Hash import CMAC
|
|
||||||
pycryptodome_found = True
|
|
||||||
except ImportError:
|
|
||||||
pycryptodome_found = False
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
timestamp = (self._download_json(
|
timestamp = (self._download_json(
|
||||||
|
@ -118,7 +105,8 @@ def _real_extract(self, url):
|
||||||
|
|
||||||
query = {
|
query = {
|
||||||
'ts': timestamp,
|
'ts': timestamp,
|
||||||
'sign': CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data, Blowfish).hexdigest(),
|
'sign': Cryptodome.Hash.CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data,
|
||||||
|
Cryptodome.Cipher.Blowfish).hexdigest(),
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
query = {}
|
query = {}
|
||||||
|
@ -138,7 +126,7 @@ def _real_extract(self, url):
|
||||||
extractor_msg = 'Video %s does not exist'
|
extractor_msg = 'Video %s does not exist'
|
||||||
elif site == 353:
|
elif site == 353:
|
||||||
continue
|
continue
|
||||||
elif not pycryptodome_found:
|
elif not Cryptodome:
|
||||||
raise ExtractorError('pycryptodomex not found. Please install', expected=True)
|
raise ExtractorError('pycryptodomex not found. Please install', expected=True)
|
||||||
elif message:
|
elif message:
|
||||||
extractor_msg += ': ' + message
|
extractor_msg += ': ' + message
|
||||||
|
|
Loading…
Reference in a new issue