import abc
import base64
import contextlib
import functools
import json
import os
import random
import ssl
import threading
from http.server import BaseHTTPRequestHandler
from socketserver import ThreadingTCPServer

import pytest

from test.helper import http_server_port, verify_address_availability
from test.test_networking import TEST_DIR
from test.test_socks import IPv6ThreadingTCPServer
from yt_dlp.dependencies import urllib3
from yt_dlp.networking import Request
from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError


class HTTPProxyAuthMixin:

    def proxy_auth_error(self):
        self.send_response(407)
        self.send_header('Proxy-Authenticate', 'Basic realm="test http proxy"')
        self.end_headers()
        return False

    def do_proxy_auth(self, username, password):
        if username is None and password is None:
            return True

        proxy_auth_header = self.headers.get('Proxy-Authorization', None)
        if proxy_auth_header is None:
            return self.proxy_auth_error()

        if not proxy_auth_header.startswith('Basic '):
            return self.proxy_auth_error()

        auth = proxy_auth_header[6:]

        try:
            auth_username, auth_password = base64.b64decode(auth).decode().split(':', 1)
        except Exception:
            return self.proxy_auth_error()

        if auth_username != (username or '') or auth_password != (password or ''):
            return self.proxy_auth_error()
        return True


class HTTPProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
    def __init__(self, *args, proxy_info=None, username=None, password=None, request_handler=None, **kwargs):
        self.username = username
        self.password = password
        self.proxy_info = proxy_info
        super().__init__(*args, **kwargs)

    def do_GET(self):
        if not self.do_proxy_auth(self.username, self.password):
            self.server.close_request(self.request)
            return
        if self.path.endswith('/proxy_info'):
            payload = json.dumps(self.proxy_info or {
                'client_address': self.client_address,
                'connect': False,
                'connect_host': None,
                'connect_port': None,
                'headers': dict(self.headers),
                'path': self.path,
                'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
            })
            self.send_response(200)
            self.send_header('Content-Type', 'application/json; charset=utf-8')
            self.send_header('Content-Length', str(len(payload)))
            self.end_headers()
            self.wfile.write(payload.encode())
        else:
            self.send_response(404)
            self.end_headers()

        self.server.close_request(self.request)


if urllib3:
    import urllib3.util.ssltransport

    class SSLTransport(urllib3.util.ssltransport.SSLTransport):
        """
        Modified version of urllib3 SSLTransport to support server side SSL

        This allows us to chain multiple TLS connections.
        """

        def __init__(self, socket, ssl_context, server_hostname=None, suppress_ragged_eofs=True, server_side=False):
            self.incoming = ssl.MemoryBIO()
            self.outgoing = ssl.MemoryBIO()

            self.suppress_ragged_eofs = suppress_ragged_eofs
            self.socket = socket

            self.sslobj = ssl_context.wrap_bio(
                self.incoming,
                self.outgoing,
                server_hostname=server_hostname,
                server_side=server_side,
            )
            self._ssl_io_loop(self.sslobj.do_handshake)

        @property
        def _io_refs(self):
            return self.socket._io_refs

        @_io_refs.setter
        def _io_refs(self, value):
            self.socket._io_refs = value

        def shutdown(self, *args, **kwargs):
            self.socket.shutdown(*args, **kwargs)
else:
    SSLTransport = None


class HTTPSProxyHandler(HTTPProxyHandler):
    def __init__(self, request, *args, **kwargs):
        certfn = os.path.join(TEST_DIR, 'testcert.pem')
        sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        sslctx.load_cert_chain(certfn, None)
        if isinstance(request, ssl.SSLSocket):
            request = SSLTransport(request, ssl_context=sslctx, server_side=True)
        else:
            request = sslctx.wrap_socket(request, server_side=True)
        super().__init__(request, *args, **kwargs)


class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
    protocol_version = 'HTTP/1.1'
    default_request_version = 'HTTP/1.1'

    def __init__(self, *args, username=None, password=None, request_handler=None, **kwargs):
        self.username = username
        self.password = password
        self.request_handler = request_handler
        super().__init__(*args, **kwargs)

    def do_CONNECT(self):
        if not self.do_proxy_auth(self.username, self.password):
            self.server.close_request(self.request)
            return
        self.send_response(200)
        self.end_headers()
        proxy_info = {
            'client_address': self.client_address,
            'connect': True,
            'connect_host': self.path.split(':')[0],
            'connect_port': int(self.path.split(':')[1]),
            'headers': dict(self.headers),
            'path': self.path,
            'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
        }
        self.request_handler(self.request, self.client_address, self.server, proxy_info=proxy_info)
        self.server.close_request(self.request)


class HTTPSConnectProxyHandler(HTTPConnectProxyHandler):
    def __init__(self, request, *args, **kwargs):
        certfn = os.path.join(TEST_DIR, 'testcert.pem')
        sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        sslctx.load_cert_chain(certfn, None)
        request = sslctx.wrap_socket(request, server_side=True)
        self._original_request = request
        super().__init__(request, *args, **kwargs)

    def do_CONNECT(self):
        super().do_CONNECT()
        self.server.close_request(self._original_request)


@contextlib.contextmanager
def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs):
    server = server_thread = None
    try:
        bind_address = bind_ip or '127.0.0.1'
        server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
        server = server_type(
            (bind_address, 0), functools.partial(proxy_server_class, request_handler=request_handler, **proxy_server_kwargs))
        server_port = http_server_port(server)
        server_thread = threading.Thread(target=server.serve_forever)
        server_thread.daemon = True
        server_thread.start()
        if '.' not in bind_address:
            yield f'[{bind_address}]:{server_port}'
        else:
            yield f'{bind_address}:{server_port}'
    finally:
        server.shutdown()
        server.server_close()
        server_thread.join(2.0)


class HTTPProxyTestContext(abc.ABC):
    REQUEST_HANDLER_CLASS = None
    REQUEST_PROTO = None

    def http_server(self, server_class, *args, **kwargs):
        return proxy_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)

    @abc.abstractmethod
    def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
        """return a dict of proxy_info"""


class HTTPProxyHTTPTestContext(HTTPProxyTestContext):
    # Standard HTTP Proxy for http requests
    REQUEST_HANDLER_CLASS = HTTPProxyHandler
    REQUEST_PROTO = 'http'

    def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
        request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
        handler.validate(request)
        return json.loads(handler.send(request).read().decode())


class HTTPProxyHTTPSTestContext(HTTPProxyTestContext):
    # HTTP Connect proxy, for https requests
    REQUEST_HANDLER_CLASS = HTTPSProxyHandler
    REQUEST_PROTO = 'https'

    def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
        request = Request(f'https://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
        handler.validate(request)
        return json.loads(handler.send(request).read().decode())


CTX_MAP = {
    'http': HTTPProxyHTTPTestContext,
    'https': HTTPProxyHTTPSTestContext,
}


@pytest.fixture(scope='module')
def ctx(request):
    return CTX_MAP[request.param]()


@pytest.mark.parametrize(
    'handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
@pytest.mark.parametrize('ctx', ['http'], indirect=True)  # pure http proxy can only support http
class TestHTTPProxy:
    def test_http_no_auth(self, handler, ctx):
        with ctx.http_server(HTTPProxyHandler) as server_address:
            with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
                proxy_info = ctx.proxy_info_request(rh)
                assert proxy_info['proxy'] == server_address
                assert proxy_info['connect'] is False
                assert 'Proxy-Authorization' not in proxy_info['headers']

    def test_http_auth(self, handler, ctx):
        with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
            with handler(proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
                proxy_info = ctx.proxy_info_request(rh)
                assert proxy_info['proxy'] == server_address
                assert 'Proxy-Authorization' in proxy_info['headers']

    def test_http_bad_auth(self, handler, ctx):
        with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
            with handler(proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
                with pytest.raises(HTTPError) as exc_info:
                    ctx.proxy_info_request(rh)
                assert exc_info.value.response.status == 407
                exc_info.value.response.close()

    def test_http_source_address(self, handler, ctx):
        with ctx.http_server(HTTPProxyHandler) as server_address:
            source_address = f'127.0.0.{random.randint(5, 255)}'
            verify_address_availability(source_address)
            with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
                         source_address=source_address) as rh:
                proxy_info = ctx.proxy_info_request(rh)
                assert proxy_info['proxy'] == server_address
                assert proxy_info['client_address'][0] == source_address

    @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
    def test_https(self, handler, ctx):
        with ctx.http_server(HTTPSProxyHandler) as server_address:
            with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
                proxy_info = ctx.proxy_info_request(rh)
                assert proxy_info['proxy'] == server_address
                assert proxy_info['connect'] is False
                assert 'Proxy-Authorization' not in proxy_info['headers']

    @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
    def test_https_verify_failed(self, handler, ctx):
        with ctx.http_server(HTTPSProxyHandler) as server_address:
            with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
                # Accept SSLError as may not be feasible to tell if it is proxy or request error.
                # note: if request proto also does ssl verification, this may also be the error of the request.
                # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
                with pytest.raises((ProxyError, SSLError)):
                    ctx.proxy_info_request(rh)

    def test_http_with_idn(self, handler, ctx):
        with ctx.http_server(HTTPProxyHandler) as server_address:
            with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
                proxy_info = ctx.proxy_info_request(rh, target_domain='中文.tw')
                assert proxy_info['proxy'] == server_address
                assert proxy_info['path'].startswith('http://xn--fiq228c.tw')
                assert proxy_info['headers']['Host'].split(':', 1)[0] == 'xn--fiq228c.tw'


@pytest.mark.parametrize(
    'handler,ctx', [
        ('Requests', 'https'),
        ('CurlCFFI', 'https'),
    ], indirect=True)
class TestHTTPConnectProxy:
    def test_http_connect_no_auth(self, handler, ctx):
        with ctx.http_server(HTTPConnectProxyHandler) as server_address:
            with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
                proxy_info = ctx.proxy_info_request(rh)
                assert proxy_info['proxy'] == server_address
                assert proxy_info['connect'] is True
                assert 'Proxy-Authorization' not in proxy_info['headers']

    def test_http_connect_auth(self, handler, ctx):
        with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
            with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
                proxy_info = ctx.proxy_info_request(rh)
                assert proxy_info['proxy'] == server_address
                assert 'Proxy-Authorization' in proxy_info['headers']

    @pytest.mark.skip_handler(
        'Requests',
        'bug in urllib3 causes unclosed socket: https://github.com/urllib3/urllib3/issues/3374',
    )
    def test_http_connect_bad_auth(self, handler, ctx):
        with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
            with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
                with pytest.raises(ProxyError):
                    ctx.proxy_info_request(rh)

    def test_http_connect_source_address(self, handler, ctx):
        with ctx.http_server(HTTPConnectProxyHandler) as server_address:
            source_address = f'127.0.0.{random.randint(5, 255)}'
            verify_address_availability(source_address)
            with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
                         source_address=source_address,
                         verify=False) as rh:
                proxy_info = ctx.proxy_info_request(rh)
                assert proxy_info['proxy'] == server_address
                assert proxy_info['client_address'][0] == source_address

    @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
    def test_https_connect_proxy(self, handler, ctx):
        with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
            with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
                proxy_info = ctx.proxy_info_request(rh)
                assert proxy_info['proxy'] == server_address
                assert proxy_info['connect'] is True
                assert 'Proxy-Authorization' not in proxy_info['headers']

    @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
    def test_https_connect_verify_failed(self, handler, ctx):
        with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
            with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
                # Accept SSLError as may not be feasible to tell if it is proxy or request error.
                # note: if request proto also does ssl verification, this may also be the error of the request.
                # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
                with pytest.raises((ProxyError, SSLError)):
                    ctx.proxy_info_request(rh)

    @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
    def test_https_connect_proxy_auth(self, handler, ctx):
        with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address:
            with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh:
                proxy_info = ctx.proxy_info_request(rh)
                assert proxy_info['proxy'] == server_address
                assert 'Proxy-Authorization' in proxy_info['headers']