HEX
Server: Apache
System: Linux server.enlacediseno.com 4.18.0-553.62.1.el8_10.x86_64 #1 SMP Wed Jul 16 04:08:25 EDT 2025 x86_64
User: maor (1069)
PHP: 7.3.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/libexec/kcare/python/kcarectl/http_utils.py
# Copyright (c) Cloud Linux Software, Inc
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT

import errno
import os
import socket
import ssl
from ssl import SSLError

if False:  # pragma: no cover
    from typing import Optional, Union  # noqa: F401

from . import config, constants, errors, log_utils, utils
from .py23 import HTTPError, Request, URLError, httplib, std_urlopen, urlparse


def urlopen_base(url, *args, **kwargs):  # mocked: tests/unit
    if hasattr(url, 'get_full_url'):
        request_url = url.get_full_url()
    else:
        request_url = url
        url = Request(url)

    headers = kwargs.pop('headers', {})
    headers.update(
        {
            'KC-Version': constants.VERSION,
            'KC-Patch-Version': constants.KC_PATCH_VERSION,
        }
    )
    for header, value in headers.items():
        url.add_header(header, value)

    log_utils.logdebug("Requesting url: `{0}`. Headers: {1}".format(request_url, headers))

    try:
        # add timeout exclude python 2.6
        if not constants.PY2_6 and 'timeout' not in kwargs:
            kwargs['timeout'] = config.HTTP_TIMEOUT
        # bandit warns about use of file: in urlopen which can happen here but is secure
        if not config.CHECK_SSL_CERTS and getattr(ssl, 'HAS_SNI', None):  # pragma: no cover unit
            ctx = ssl.create_default_context()
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE
            kwargs['context'] = ctx
            return std_urlopen(url, *args, **kwargs)  # nosec B310
        return std_urlopen(url, *args, **kwargs)  # nosec B310
    except HTTPError as ex:
        if ex.code == 404:
            raise errors.NotFound(ex.url, ex.code, ex.msg, ex.hdrs, ex.fp)
        # HTTPError is a URLError descendant and contains URL, raise it as is
        raise
    except URLError as ex:
        # Local patches OSError(No such file) should be interpreted as Not found(404)
        # It was done as a chain because when it implemented with "duck-typing" it will mess
        # with error context
        if ex.args and hasattr(ex.args[0], 'errno') and ex.args[0].errno == errno.ENOENT:
            raise errors.NotFound(url, 404, str(ex), None, None)  # type: ignore[arg-type]

        # there is no information about URL in the base URLError class, add it and raise
        ex.reason = 'Request for `{0}` failed: {1}'.format(request_url, ex)
        ex.url = request_url  # type: ignore[attr-defined]
        raise


def check_urlopen_retry_factory(retry_on_500=True):
    def check_function(e, state):
        if isinstance(e, HTTPError):
            return retry_on_500 and e.code >= 500
        elif isinstance(e, (URLError, httplib.HTTPException, SSLError, socket.timeout)):
            return True
        elif hasattr(e, 'args') and len(e.args) == 2 and e.args[0] == errno.ECONNRESET:  # pragma: no cover unit
            # SysCallError "Connection reset by peer" from PyOpenSSL
            return True

    return check_function


def is_local_url(url):
    if hasattr(url, 'get_full_url'):
        url = url.get_full_url()
    return url.startswith('file:')


def urlopen(url, *args, **kwargs):
    retry_on_500 = kwargs.pop('retry_on_500', True)
    retry_count = kwargs.pop('retry_count', constants.RETRY_COUNT)
    if is_local_url(url):
        return urlopen_base(url, *args, **kwargs)
    return utils.retry(check_urlopen_retry_factory(retry_on_500=retry_on_500), count=retry_count)(urlopen_base)(url, *args, **kwargs)


def http_request(url, auth_string, auth_token=None, method=None):
    request = Request(url, method=method)
    if not config.UPDATE_FROM_LOCAL and auth_string:
        request.add_header('Authorization', 'Basic {0}'.format(auth_string))

    if not config.UPDATE_FROM_LOCAL and auth_token:
        request.add_header(constants.AUTH_TOKEN_HEADER, auth_token)

    return request


def get_proxy_from_env(scheme):
    if scheme == 'http':
        return os.getenv('http_proxy') or os.getenv('HTTP_PROXY')
    elif scheme == 'https':
        return os.getenv('https_proxy') or os.getenv('HTTPS_PROXY')


def proxy_is_used():
    return bool(get_proxy_from_env('http')) or bool(get_proxy_from_env('https'))


check_urlopen_retry = check_urlopen_retry_factory()


@utils.retry(check_retry=check_urlopen_retry)
def upload_file(file_path, upload_url, auth_string=None):
    # type: (str, str, Optional[str]) -> None
    """Upload a file to the given URL using HTTP PUT with chunked streaming.
    Note: The standard library urllib doesn't support PUT with data
    We need to use httplib directly for this

    This function uses streaming upload to support large files up to 1GB
    without loading the entire file into memory.

    :param file_path: Path to the file to upload
    :param upload_url: Full URL to upload the file to. Query params are ignored.
    :param auth_string: Optional authentication string for Basic Auth
    :return: None if upload succeeded
    :raises HTTPError: If upload fails with HTTP status >= 400
    :raises ValueError: If URL is invalid
    """
    file_size = os.path.getsize(file_path)
    if not file_size:
        raise ValueError('Refusing to upload empty file: {0}'.format(file_path))

    parsed = urlparse(utils.nstr(upload_url))
    host = parsed.hostname
    port = parsed.port
    url_path = parsed.path or '/'

    if host is None:
        raise ValueError('Invalid URL: missing hostname')

    if port is None:  # pragma: no cover
        port = 443 if parsed.scheme == 'https' else 80

    if parsed.scheme == 'http':
        conn_cls = httplib.HTTPConnection  # type: Union[type[httplib.HTTPConnection], type[httplib.HTTPSConnection]]
    elif parsed.scheme == 'https':  # pragma: no cover
        conn_cls = httplib.HTTPSConnection
    else:
        raise ValueError('Invalid URL: unsupported scheme')

    # Python 2.6 doesn't support timeout parameter
    conn_kwargs = {} if constants.PY2_6 else {'timeout': config.HTTP_UPLOAD_TIMEOUT}
    conn = conn_cls(host, port, **conn_kwargs)  # type: ignore[arg-type]

    headers = {}
    if auth_string:
        headers['Authorization'] = 'Basic {0}'.format(auth_string)

    headers['Content-Type'] = 'application/octet-stream'
    headers['Content-Length'] = str(file_size)
    headers['KC-Version'] = constants.VERSION

    try:
        # Use the lower-level API to support streaming
        conn.putrequest('PUT', url_path)

        # Send headers
        for header, value in headers.items():
            conn.putheader(header, value)
        conn.endheaders()

        # Stream file data in chunks
        with open(file_path, 'rb') as f:
            conn.send(f)

        response = conn.getresponse()

        if response.status >= 400:
            # Read response body for error details
            try:
                error_body = response.read()
            except Exception:  # pragma: no cover
                error_body = None

            error_msg = 'Failed to upload file: HTTP {0}'.format(response.status)
            if error_body is not None:  # pragma: no branch
                error_msg += ' - {0}'.format(error_body)  # type: ignore[str-bytes-safe]

            log_utils.logerror(error_msg)

            # HTTPError compatible with retry mechanism
            raise HTTPError(upload_url, response.status, error_msg, None, None)  # type: ignore[arg-type]
    finally:
        conn.close()