File: //opt/saltstack/salt/lib/python3.10/site-packages/salt/utils/dockermod/translate/network.py
"""
Functions to translate input for network creation
"""
from salt.exceptions import SaltInvocationError
from . import helpers
ALIASES = {
"driver_opt": "options",
"driver_opts": "options",
"ipv6": "enable_ipv6",
}
IPAM_ALIASES = {
"ip_range": "iprange",
"aux_address": "aux_addresses",
}
# ALIASES is a superset of IPAM_ALIASES
ALIASES.update(IPAM_ALIASES)
ALIASES_REVMAP = {y: x for x, y in ALIASES.items()}
DEFAULTS = {"check_duplicate": True}
def _post_processing(
kwargs, skip_translate, invalid
): # pylint: disable=unused-argument
"""
Additional network-specific post-translation processing
"""
# If any defaults were not expicitly passed, add them
for item in DEFAULTS:
if item not in kwargs:
kwargs[item] = DEFAULTS[item]
# Functions below must match names of docker-py arguments
def driver(val, **kwargs): # pylint: disable=unused-argument
return helpers.translate_str(val)
def options(val, **kwargs): # pylint: disable=unused-argument
return helpers.translate_key_val(val, delimiter="=")
def ipam(val, **kwargs): # pylint: disable=unused-argument
return helpers.translate_dict(val)
def check_duplicate(val, **kwargs): # pylint: disable=unused-argument
return helpers.translate_bool(val)
def internal(val, **kwargs): # pylint: disable=unused-argument
return helpers.translate_bool(val)
def labels(val, **kwargs): # pylint: disable=unused-argument
return helpers.translate_labels(val)
def enable_ipv6(val, **kwargs): # pylint: disable=unused-argument
return helpers.translate_bool(val)
def attachable(val, **kwargs): # pylint: disable=unused-argument
return helpers.translate_bool(val)
def ingress(val, **kwargs): # pylint: disable=unused-argument
return helpers.translate_bool(val)
# IPAM args
def ipam_driver(val, **kwargs): # pylint: disable=unused-argument
return driver(val, **kwargs)
def ipam_opts(val, **kwargs): # pylint: disable=unused-argument
return options(val, **kwargs)
def ipam_pools(val, **kwargs): # pylint: disable=unused-argument
if not hasattr(val, "__iter__") or not all(isinstance(x, dict) for x in val):
# Can't do a simple dictlist check because each dict may have more than
# one element.
raise SaltInvocationError("ipam_pools must be a list of dictionaries")
skip_translate = kwargs.get("skip_translate", ())
if not (skip_translate is True or "ipam_pools" in skip_translate):
_globals = globals()
for ipam_dict in val:
for key in list(ipam_dict):
if skip_translate is not True and key in skip_translate:
continue
if key in IPAM_ALIASES:
# Make sure we resolve aliases, since this wouldn't have
# been done within the individual IPAM dicts
ipam_dict[IPAM_ALIASES[key]] = ipam_dict.pop(key)
key = IPAM_ALIASES[key]
if key in _globals:
ipam_dict[key] = _globals[key](ipam_dict[key])
return val
def subnet(val, **kwargs): # pylint: disable=unused-argument
validate_ip_addrs = kwargs.get("validate_ip_addrs", True)
val = helpers.translate_str(val)
if validate_ip_addrs:
helpers.validate_subnet(val)
return val
def iprange(val, **kwargs): # pylint: disable=unused-argument
validate_ip_addrs = kwargs.get("validate_ip_addrs", True)
val = helpers.translate_str(val)
if validate_ip_addrs:
helpers.validate_subnet(val)
return val
def gateway(val, **kwargs): # pylint: disable=unused-argument
validate_ip_addrs = kwargs.get("validate_ip_addrs", True)
val = helpers.translate_str(val)
if validate_ip_addrs:
helpers.validate_ip(val)
return val
def aux_addresses(val, **kwargs): # pylint: disable=unused-argument
validate_ip_addrs = kwargs.get("validate_ip_addrs", True)
val = helpers.translate_key_val(val, delimiter="=")
if validate_ip_addrs:
for address in val.values():
helpers.validate_ip(address)
return val