[core] Change how Cookie headers are handled

Cookies are now saved and loaded under `cookies` key in the info dict
instead of `http_headers.Cookie`. Cookies passed in headers are
auto-scoped to the input URLs with a warning.

Ref: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj

Authored by: Grub4K
This commit is contained in:
Simon Sawicki 2023-07-06 21:51:04 +05:30 committed by pukkandan
parent f8b4bcc0a7
commit 3121512228
No known key found for this signature in database
GPG key ID: 7EEE9E1E817D0A39
3 changed files with 139 additions and 4 deletions

View file

@ -1213,6 +1213,62 @@ def _real_extract(self, url):
self.assertEqual(downloaded['extractor'], 'Video')
self.assertEqual(downloaded['extractor_key'], 'Video')
def test_header_cookies(self):
from http.cookiejar import Cookie
ydl = FakeYDL()
ydl.report_warning = lambda *_, **__: None
def cookie(name, value, version=None, domain='', path='', secure=False, expires=None):
return Cookie(
version or 0, name, value, None, False,
domain, bool(domain), bool(domain), path, bool(path),
secure, expires, False, None, None, rest={})
_test_url = 'https://yt.dlp/test'
def test(encoded_cookies, cookies, headers=False, round_trip=None, error=None):
def _test():
ydl.cookiejar.clear()
ydl._load_cookies(encoded_cookies, from_headers=headers)
if headers:
ydl._apply_header_cookies(_test_url)
data = {'url': _test_url}
ydl._calc_headers(data)
self.assertCountEqual(
map(vars, ydl.cookiejar), map(vars, cookies),
'Extracted cookiejar.Cookie is not the same')
if not headers:
self.assertEqual(
data.get('cookies'), round_trip or encoded_cookies,
'Cookie is not the same as round trip')
ydl.__dict__['_YoutubeDL__header_cookies'] = []
with self.subTest(msg=encoded_cookies):
if not error:
_test()
return
with self.assertRaisesRegex(Exception, error):
_test()
test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')])
test('test=value', [cookie('test', 'value')], error='Unscoped cookies are not allowed')
test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [
cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'),
cookie('cookie2', 'value2', domain='.yt.dlp', path='/')])
test('test=value; Domain=.yt.dlp; Path=/test; Secure; Expires=9999999999', [
cookie('test', 'value', domain='.yt.dlp', path='/test', secure=True, expires=9999999999)])
test('test="value; "; path=/test; domain=.yt.dlp', [
cookie('test', 'value; ', domain='.yt.dlp', path='/test')],
round_trip='test="value\\073 "; Domain=.yt.dlp; Path=/test')
test('name=; Domain=.yt.dlp', [cookie('name', '', domain='.yt.dlp')],
round_trip='name=""; Domain=.yt.dlp')
test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True)
test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error='Invalid syntax')
ydl.deprecated_feature = ydl.report_error
test('test=value', [], headers=True, error='Passing cookies as a header is a potential security risk')
if __name__ == '__main__':
unittest.main()

View file

@ -1,9 +1,11 @@
import collections
import contextlib
import copy
import datetime
import errno
import fileinput
import functools
import http.cookiejar
import io
import itertools
import json
@ -25,7 +27,7 @@
from .cache import Cache
from .compat import urllib # isort: split
from .compat import compat_os_name, compat_shlex_quote
from .cookies import load_cookies
from .cookies import LenientSimpleCookie, load_cookies
from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
from .downloader.rtmp import rtmpdump_version
from .extractor import gen_extractor_classes, get_info_extractor
@ -673,6 +675,9 @@ def process_color_policy(stream):
if auto_init and auto_init != 'no_verbose_header':
self.print_debug_header()
self.__header_cookies = []
self._load_cookies(traverse_obj(self.params.get('http_headers'), 'cookie', casesense=False)) # compat
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
self.report_warning(f'{option} is deprecated. Use {suggestion} instead')
@ -1625,8 +1630,60 @@ def progress(msg):
self.to_screen('')
raise
def _load_cookies(self, data, *, from_headers=True):
"""Loads cookies from a `Cookie` header
This tries to work around the security vulnerability of passing cookies to every domain.
See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
The unscoped cookies are saved for later to be stored in the jar with a limited scope.
@param data The Cookie header as string to load the cookies from
@param from_headers If `False`, allows Set-Cookie syntax in the cookie string (at least a domain will be required)
"""
for cookie in LenientSimpleCookie(data).values():
if from_headers and any(cookie.values()):
raise ValueError('Invalid syntax in Cookie Header')
domain = cookie.get('domain') or ''
expiry = cookie.get('expires')
if expiry == '': # 0 is valid
expiry = None
prepared_cookie = http.cookiejar.Cookie(
cookie.get('version') or 0, cookie.key, cookie.value, None, False,
domain, True, True, cookie.get('path') or '', bool(cookie.get('path')),
cookie.get('secure') or False, expiry, False, None, None, {})
if domain:
self.cookiejar.set_cookie(prepared_cookie)
elif from_headers:
self.deprecated_feature(
'Passing cookies as a header is a potential security risk; '
'they will be scoped to the domain of the downloaded urls. '
'Please consider loading cookies from a file or browser instead.')
self.__header_cookies.append(prepared_cookie)
else:
self.report_error('Unscoped cookies are not allowed; please specify some sort of scoping',
tb=False, is_error=False)
def _apply_header_cookies(self, url):
"""Applies stray header cookies to the provided url
This loads header cookies and scopes them to the domain provided in `url`.
While this is not ideal, it helps reduce the risk of them being sent
to an unintended destination while mostly maintaining compatibility.
"""
parsed = urllib.parse.urlparse(url)
if not parsed.hostname:
return
for cookie in map(copy.copy, self.__header_cookies):
cookie.domain = f'.{parsed.hostname}'
self.cookiejar.set_cookie(cookie)
@_handle_extraction_exceptions
def __extract_info(self, url, ie, download, extra_info, process):
self._apply_header_cookies(url)
try:
ie_result = ie.extract(url)
except UserNotLive as e:
@ -2414,9 +2471,24 @@ def _calc_headers(self, info_dict):
if 'Youtubedl-No-Compression' in res: # deprecated
res.pop('Youtubedl-No-Compression', None)
res['Accept-Encoding'] = 'identity'
cookies = self.cookiejar.get_cookie_header(info_dict['url'])
cookies = self.cookiejar.get_cookies_for_url(info_dict['url'])
if cookies:
res['Cookie'] = cookies
encoder = LenientSimpleCookie()
values = []
for cookie in cookies:
_, value = encoder.value_encode(cookie.value)
values.append(f'{cookie.name}={value}')
if cookie.domain:
values.append(f'Domain={cookie.domain}')
if cookie.path:
values.append(f'Path={cookie.path}')
if cookie.secure:
values.append('Secure')
if cookie.expires:
values.append(f'Expires={cookie.expires}')
if cookie.version:
values.append(f'Version={cookie.version}')
info_dict['cookies'] = '; '.join(values)
if 'X-Forwarded-For' not in res:
x_forwarded_for_ip = info_dict.get('__x_forwarded_for_ip')
@ -3423,6 +3495,8 @@ def download_with_info_file(self, info_filename):
infos = [self.sanitize_info(info, self.params.get('clean_infojson', True))
for info in variadic(json.loads('\n'.join(f)))]
for info in infos:
self._load_cookies(info.get('cookies'), from_headers=False)
self._load_cookies(traverse_obj(info.get('http_headers'), 'Cookie', casesense=False)) # compat
try:
self.__download_wrapper(self.process_ie_result)(info, download=True)
except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e:

View file

@ -32,6 +32,7 @@
timetuple_from_msec,
try_call,
)
from ..utils.traversal import traverse_obj
class FileDownloader:
@ -419,7 +420,6 @@ def download(self, filename, info_dict, subtitle=False):
"""Download to a filename using the info from info_dict
Return True on success and False otherwise
"""
nooverwrites_and_exists = (
not self.params.get('overwrites', True)
and os.path.exists(encodeFilename(filename))
@ -453,6 +453,11 @@ def download(self, filename, info_dict, subtitle=False):
self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...')
time.sleep(sleep_interval)
# Filter the `Cookie` header from the info_dict to prevent leaks.
# See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
info_dict['http_headers'] = dict(traverse_obj(info_dict, (
'http_headers', {dict.items}, lambda _, pair: pair[0].lower() != 'cookie'))) or None
ret = self.real_download(filename, info_dict)
self._finish_multiline_status()
return ret, True