PostgreSQL
 sql >> Base de données >  >> RDS >> PostgreSQL

Comment utiliser bindparam() dans une expression compilée personnalisée ?

J'ai compris. Les valeurs réelles de bindparams sont conservées dans un objet appelé SQLCompiler qui est généralement spécifique au dialecte. Ici">Ici(lien vers GitHub) est l'endroit où les bindparams sont stockés dans un SQLCompiler instance pendant le processus de compilation de la requête

Ainsi, la version finale de mon extrait de code ressemble à ceci :

class values(FromClause):
    named_with_column = True

    def __init__(self, columns, *args, **kw):
        self._column_args = columns
        self.list = args
        self.alias_name = self.name = kw.pop('alias_name', None)

    def _populate_column_collection(self):
        # self._columns.update((col.name, col) for col in self._column_args)
        for c in self._column_args:
            c._make_proxy(self, c.name)


@compiles(values)
def compile_values(clause, compiler, asfrom=False, **kw):
    def decide(value, column):
        add_type_hint = False
        if isinstance(value, array) and not value.clauses:  # for empty array literals
            add_type_hint = True

        if isinstance(value, ClauseElement):
            intermediate = compiler.process(value)
            if add_type_hint:
                intermediate += '::' + str(column.type)
            return intermediate

        elif value is None:
            return compiler.render_literal_value(
                value,
                NULLTYPE
            ) + '::' + str(column.type)
        else:
            return compiler.process(
                bindparam(
                    None,
                    value=compiler.render_literal_value(
                        value,
                        column.type
                    ).strip("'")
                )
            ) + '::' + str(column.type)

    columns = clause.columns
    v = "VALUES %s" % ", ".join(
        "(%s)" % ", ".join(
            decide(elem, column)
            for elem, column in zip(tup, columns))
        for tup in clause.list
    )
    if asfrom:
        if clause.alias_name:
            v = "(%s) AS %s (%s)" % (v, clause.alias_name, (", ".join(c.name for c in clause.columns)))
        else:
            v = "(%s)" % v
    return v