HEX
Server: Apache
System: Linux server.enlacediseno.com 4.18.0-553.62.1.el8_10.x86_64 #1 SMP Wed Jul 16 04:08:25 EDT 2025 x86_64
User: maor (1069)
PHP: 7.3.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //opt/saltstack/salt/lib/python3.10/site-packages/salt/returners/postgres_local_cache.py
"""
Use a postgresql server for the master job cache. This helps the job cache to
cope with scale.

.. note::
    There are three PostgreSQL returners.  Any can function as an external
    :ref:`master job cache <external-job-cache>`. but each has different
    features.  SaltStack recommends
    :mod:`returners.pgjsonb <salt.returners.pgjsonb>` if you are working with
    a version of PostgreSQL that has the appropriate native binary JSON types.
    Otherwise, review
    :mod:`returners.postgres <salt.returners.postgres>` and
    :mod:`returners.postgres_local_cache <salt.returners.postgres_local_cache>`
    to see which module best suits your particular needs.

:maintainer:    gjredelinghuys@gmail.com
:maturity:      Stable
:depends:       psycopg2
:platform:      all

To enable this returner the minion will need the psycopg2 installed and
the following values configured in the master config:

.. code-block:: yaml

    master_job_cache: postgres_local_cache
    master_job_cache.postgres.host: 'salt'
    master_job_cache.postgres.user: 'salt'
    master_job_cache.postgres.passwd: 'salt'
    master_job_cache.postgres.db: 'salt'
    master_job_cache.postgres.port: 5432

Running the following command as the postgres user should create the database
correctly:

.. code-block:: sql

    psql << EOF
    CREATE ROLE salt WITH PASSWORD 'salt';
    CREATE DATABASE salt WITH OWNER salt;
    EOF

In case the postgres database is a remote host, you'll need this command also:

.. code-block:: sql

   ALTER ROLE salt WITH LOGIN;

and then:

.. code-block:: sql

    psql -h localhost -U salt << EOF
    --
    -- Table structure for table 'jids'
    --

    DROP TABLE IF EXISTS jids;
    CREATE TABLE jids (
      jid   varchar(20) PRIMARY KEY,
      started TIMESTAMP WITH TIME ZONE DEFAULT now(),
      tgt_type text NOT NULL,
      cmd text NOT NULL,
      tgt text NOT NULL,
      kwargs text NOT NULL,
      ret text NOT NULL,
      username text NOT NULL,
      arg text NOT NULL,
      fun text NOT NULL
    );

    --
    -- Table structure for table 'salt_returns'
    --
    -- note that 'success' must not have NOT NULL constraint, since
    -- some functions don't provide it.

    DROP TABLE IF EXISTS salt_returns;
    CREATE TABLE salt_returns (
      added     TIMESTAMP WITH TIME ZONE DEFAULT now(),
      fun       text NOT NULL,
      jid       varchar(20) NOT NULL,
      return    text NOT NULL,
      id        text NOT NULL,
      success   boolean
    );
    CREATE INDEX ON salt_returns (added);
    CREATE INDEX ON salt_returns (id);
    CREATE INDEX ON salt_returns (jid);
    CREATE INDEX ON salt_returns (fun);

    DROP TABLE IF EXISTS salt_events;
    CREATE TABLE salt_events (
      id SERIAL,
      tag text NOT NULL,
      data text NOT NULL,
      alter_time TIMESTAMP WITH TIME ZONE DEFAULT now(),
      master_id text NOT NULL
    );
    CREATE INDEX ON salt_events (tag);
    CREATE INDEX ON salt_events (data);
    CREATE INDEX ON salt_events (id);
    CREATE INDEX ON salt_events (master_id);
    EOF

Required python modules: psycopg2
"""

import logging
import re
import sys

import salt.utils.jid
import salt.utils.job
import salt.utils.json

try:
    import psycopg2

    HAS_POSTGRES = True
except ImportError:
    HAS_POSTGRES = False

log = logging.getLogger(__name__)

__virtualname__ = "postgres_local_cache"


def __virtual__():
    if not HAS_POSTGRES:
        return (False, "Could not import psycopg2; postges_local_cache disabled")
    return __virtualname__


def _get_conn():
    """
    Return a postgres connection.
    """
    try:
        conn = psycopg2.connect(
            host=__opts__["master_job_cache.postgres.host"],
            user=__opts__["master_job_cache.postgres.user"],
            password=__opts__["master_job_cache.postgres.passwd"],
            database=__opts__["master_job_cache.postgres.db"],
            port=__opts__["master_job_cache.postgres.port"],
        )
    except psycopg2.OperationalError:
        log.error("Could not connect to SQL server: %s", sys.exc_info()[0])
        return None
    return conn


def _close_conn(conn):
    """
    Close the postgres connection.
    """
    conn.commit()
    conn.close()


def _format_job_instance(job):
    """
    Format the job instance correctly
    """
    ret = {
        "Function": job.get("fun", "unknown-function"),
        "Arguments": salt.utils.json.loads(job.get("arg", "[]")),
        # unlikely but safeguard from invalid returns
        "Target": job.get("tgt", "unknown-target"),
        "Target-type": job.get("tgt_type", "list"),
        "User": job.get("user", "root"),
    }
    # TODO: Add Metadata support when it is merged from develop
    return ret


def _format_jid_instance(jid, job):
    """
    Format the jid correctly
    """
    ret = _format_job_instance(job)
    ret.update({"StartTime": salt.utils.jid.jid_to_time(jid)})
    return ret


def _gen_jid(cur):
    """
    Generate an unique job id
    """
    jid = salt.utils.jid.gen_jid(__opts__)
    sql = """SELECT jid FROM jids WHERE jid = %s"""
    cur.execute(sql, (jid,))
    data = cur.fetchall()
    if not data:
        return jid
    return None


def prep_jid(nocache=False, passed_jid=None):
    """
    Return a job id and prepare the job id directory
    This is the function responsible for making sure jids don't collide
    (unless its passed a jid). So do what you have to do to make sure that
    stays the case
    """
    conn = _get_conn()
    if conn is None:
        return None
    cur = conn.cursor()
    if passed_jid is None:
        jid = _gen_jid(cur)
    else:
        jid = passed_jid
    while not jid:
        log.info("jid clash, generating a new one")
        jid = _gen_jid(cur)

    cur.close()
    conn.close()
    return jid


def returner(load):
    """
    Return data to a postgres server
    """
    conn = _get_conn()
    if conn is None:
        return None
    cur = conn.cursor()
    sql = """INSERT INTO salt_returns
            (fun, jid, return, id, success)
            VALUES (%s, %s, %s, %s, %s)"""
    ret = str(load["return"])
    job_ret = {"return": ret}
    if "retcode" in load:
        job_ret["retcode"] = load["retcode"]
    if "success" in load:
        job_ret["success"] = load["success"]
    cur.execute(
        sql,
        (
            load["fun"],
            load["jid"],
            salt.utils.json.dumps(job_ret),
            load["id"],
            load.get("success"),
        ),
    )
    _close_conn(conn)


def event_return(events):
    """
    Return event to a postgres server

    Require that configuration be enabled via 'event_return'
    option in master config.
    """
    conn = _get_conn()
    if conn is None:
        return None
    cur = conn.cursor()
    for event in events:
        tag = event.get("tag", "")
        data = event.get("data", "")
        sql = """INSERT INTO salt_events
                (tag, data, master_id)
                VALUES (%s, %s, %s)"""
        cur.execute(sql, (tag, salt.utils.json.dumps(data), __opts__["id"]))
    _close_conn(conn)


def save_load(jid, clear_load, minions=None):
    """
    Save the load to the specified jid id
    """
    jid = _escape_jid(jid)
    conn = _get_conn()
    if conn is None:
        return None
    cur = conn.cursor()
    sql = (
        """INSERT INTO jids """
        """(jid, started, tgt_type, cmd, tgt, kwargs, ret, username, arg,"""
        """ fun) """
        """VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
    )

    cur.execute(
        sql,
        (
            jid,
            salt.utils.jid.jid_to_time(jid),
            str(clear_load.get("tgt_type")),
            str(clear_load.get("cmd")),
            str(clear_load.get("tgt")),
            str(clear_load.get("kwargs")),
            str(clear_load.get("ret")),
            str(clear_load.get("user")),
            str(salt.utils.json.dumps(clear_load.get("arg"))),
            str(clear_load.get("fun")),
        ),
    )
    # TODO: Add Metadata support when it is merged from develop
    _close_conn(conn)


def save_minions(jid, minions, syndic_id=None):  # pylint: disable=unused-argument
    """
    Included for API consistency
    """


def _escape_jid(jid):
    """
    Do proper formatting of the jid
    """
    jid = str(jid)
    jid = re.sub(r"'*", "", jid)
    return jid


def _build_dict(data):
    """
    Rebuild dict
    """
    result = {}
    # TODO: Add Metadata support when it is merged from develop
    result["jid"] = data[0]
    result["tgt_type"] = data[1]
    result["cmd"] = data[2]
    result["tgt"] = data[3]
    result["kwargs"] = data[4]
    result["ret"] = data[5]
    result["user"] = data[6]
    result["arg"] = data[7]
    result["fun"] = data[8]
    return result


def get_load(jid):
    """
    Return the load data that marks a specified jid
    """
    jid = _escape_jid(jid)
    conn = _get_conn()
    if conn is None:
        return None
    cur = conn.cursor()
    sql = (
        """SELECT jid, tgt_type, cmd, tgt, kwargs, ret, username, arg,"""
        """ fun FROM jids WHERE jid = %s"""
    )
    cur.execute(sql, (jid,))
    data = cur.fetchone()
    if data:
        return _build_dict(data)
    _close_conn(conn)
    return {}


def get_jid(jid):
    """
    Return the information returned when the specified job id was executed
    """
    jid = _escape_jid(jid)
    conn = _get_conn()
    if conn is None:
        return None
    cur = conn.cursor()
    sql = """SELECT id, return FROM salt_returns WHERE jid = %s"""

    cur.execute(sql, (jid,))
    data = cur.fetchall()
    ret = {}
    if data:
        for minion, full_ret in data:
            ret_data = salt.utils.json.loads(full_ret)
            if not isinstance(ret_data, dict) or "return" not in ret_data:
                # Convert the old format in which the return contains the only return data to the
                # new that is dict containing 'return' and optionally 'retcode' and 'success'.
                ret_data = {"return": ret_data}
            ret[minion] = ret_data
    _close_conn(conn)
    return ret


def get_jids():
    """
    Return a list of all job ids
    For master job cache this also formats the output and returns a string
    """
    conn = _get_conn()
    cur = conn.cursor()
    sql = (
        """SELECT """
        """jid, tgt_type, cmd, tgt, kwargs, ret, username, arg, fun """
        """FROM jids"""
    )
    keep_jobs_seconds = int(salt.utils.job.get_keep_jobs_seconds(__opts__))
    if keep_jobs_seconds != 0:
        sql = (
            sql
            + " WHERE started > NOW() - INTERVAL '"
            + str(keep_jobs_seconds)
            + "' SECOND"
        )

    cur.execute(sql)
    ret = {}
    data = cur.fetchone()
    while data:
        data_dict = _build_dict(data)
        ret[data_dict["jid"]] = _format_jid_instance(data_dict["jid"], data_dict)
        data = cur.fetchone()
    cur.close()
    conn.close()
    return ret


def clean_old_jobs():
    """
    Clean out the old jobs from the job cache
    """
    return