标签归档:sql

sql查询中的python列表作为参数

问题:sql查询中的python列表作为参数

我有一个python列表,说我

l = [1,5,8]

我想编写一个SQL查询来获取列表中所有元素的数据,例如

select name from students where id = |IN THE LIST l|

我该如何完成?

I have a python list, say l

l = [1,5,8]

I want to write a sql query to get the data for all the elements of the list, say

select name from students where id = |IN THE LIST l|

How do I accomplish this?


回答 0

到目前为止,答案一直是将这些值模板化为纯SQL字符串。这对于整数绝对没问题,但是如果我们想对字符串进行处理,则会遇到转义问题。

这是一个使用参数化查询的变体,它对两个都适用:

placeholder= '?' # For SQLite. See DBAPI paramstyle.
placeholders= ', '.join(placeholder for unused in l)
query= 'SELECT name FROM students WHERE id IN (%s)' % placeholders
cursor.execute(query, l)

Answers so far have been templating the values into a plain SQL string. That’s absolutely fine for integers, but if we wanted to do it for strings we get the escaping issue.

Here’s a variant using a parameterised query that would work for both:

placeholder= '?' # For SQLite. See DBAPI paramstyle.
placeholders= ', '.join(placeholder for unused in l)
query= 'SELECT name FROM students WHERE id IN (%s)' % placeholders
cursor.execute(query, l)

回答 1

最简单的方法是将列表转到tuple第一

t = tuple(l)
query = "select name from studens where id IN {}".format(t)

Easiest way is to turn the list to tuple first

t = tuple(l)
query = "select name from studens where id IN {}".format(t)

回答 2

不要使其复杂化,解决方案很简单。

l = [1,5,8]

l = tuple(l)

params = {'l': l}

cursor.execute('SELECT * FROM table where id in %(l)s',params)

我希望这可以帮助!!!

Dont complicate it, Solution for this is simple.

l = [1,5,8]

l = tuple(l)

params = {'l': l}

cursor.execute('SELECT * FROM table where id in %(l)s',params)

I hope this helped !!!


回答 3

您想要的SQL是

select name from studens where id in (1, 5, 8)

如果您想从python构造它,可以使用

l = [1, 5, 8]
sql_query = 'select name from studens where id in (' + ','.join(map(str, l)) + ')'

地图功能将改变列表转换成可以通过使用逗号胶合在一起的字符串列表str.join方法。

或者:

l = [1, 5, 8]
sql_query = 'select name from studens where id in (' + ','.join((str(n) for n in l)) + ')'

如果您更喜欢生成器表达式而不是map函数。

更新:S. Lott在评论中提到Python SQLite绑定不支持序列。在这种情况下,您可能想要

select name from studens where id = 1 or id = 5 or id = 8

产生者

sql_query = 'select name from studens where ' + ' or '.join(('id = ' + str(n) for n in l))

The SQL you want is

select name from studens where id in (1, 5, 8)

If you want to construct this from the python you could use

l = [1, 5, 8]
sql_query = 'select name from studens where id in (' + ','.join(map(str, l)) + ')'

The map function will transform the list into a list of strings that can be glued together by commas using the str.join method.

Alternatively:

l = [1, 5, 8]
sql_query = 'select name from studens where id in (' + ','.join((str(n) for n in l)) + ')'

if you prefer generator expressions to the map function.

UPDATE: S. Lott mentions in the comments that the Python SQLite bindings don’t support sequences. In that case, you might want

select name from studens where id = 1 or id = 5 or id = 8

Generated by

sql_query = 'select name from studens where ' + ' or '.join(('id = ' + str(n) for n in l))

回答 4

string.join用逗号分隔的列表值,并使用format运算符形成查询字符串。

myquery = "select name from studens where id in (%s)" % ",".join(map(str,mylist))

(谢谢,布莱尔康拉德

string.join the list values separated by commas, and use the format operator to form a query string.

myquery = "select name from studens where id in (%s)" % ",".join(map(str,mylist))

(Thanks, blair-conrad)


回答 5

我喜欢bobince的回答:

placeholder= '?' # For SQLite. See DBAPI paramstyle.
placeholders= ', '.join(placeholder for unused in l)
query= 'SELECT name FROM students WHERE id IN (%s)' % placeholders
cursor.execute(query, l)

但是我注意到了这一点:

placeholders= ', '.join(placeholder for unused in l)

可以替换为:

placeholders= ', '.join(placeholder*len(l))

如果不太聪明和不太笼统,我会觉得这更直接。这里l需要有一个长度(即,引用一个定义__len__方法的对象),这应该不是问题。但是占位符也必须是单个字符。要支持多字符占位符使用:

placeholders= ', '.join([placeholder]*len(l))

I like bobince’s answer:

placeholder= '?' # For SQLite. See DBAPI paramstyle.
placeholders= ', '.join(placeholder for unused in l)
query= 'SELECT name FROM students WHERE id IN (%s)' % placeholders
cursor.execute(query, l)

But I noticed this:

placeholders= ', '.join(placeholder for unused in l)

Can be replaced with:

placeholders= ', '.join(placeholder*len(l))

I find this more direct if less clever and less general. Here l is required to have a length (i.e. refer to an object that defines a __len__ method), which shouldn’t be a problem. But placeholder must also be a single character. To support a multi-character placeholder use:

placeholders= ', '.join([placeholder]*len(l))

回答 6

@umount答案的解决方案,因为它用一个元素的元组中断,因为(1,)不是有效的SQL。

>>> random_ids = [1234,123,54,56,57,58,78,91]
>>> cursor.execute("create table test (id)")
>>> for item in random_ids:
    cursor.execute("insert into test values (%d)" % item)
>>> sublist = [56,57,58]
>>> cursor.execute("select id from test where id in %s" % str(tuple(sublist)).replace(',)',')'))
>>> a = cursor.fetchall()
>>> a
[(56,), (57,), (58,)]

sql字符串的其他解决方案:

cursor.execute("select id from test where id in (%s)" % ('"'+'", "'.join(l)+'"'))

Solution for @umounted answer, because that broke with a one-element tuple, since (1,) is not valid SQL.:

>>> random_ids = [1234,123,54,56,57,58,78,91]
>>> cursor.execute("create table test (id)")
>>> for item in random_ids:
    cursor.execute("insert into test values (%d)" % item)
>>> sublist = [56,57,58]
>>> cursor.execute("select id from test where id in %s" % str(tuple(sublist)).replace(',)',')'))
>>> a = cursor.fetchall()
>>> a
[(56,), (57,), (58,)]

Other solution for sql string:

cursor.execute("select id from test where id in (%s)" % ('"'+'", "'.join(l)+'"'))

回答 7

placeholders= ', '.join("'{"+str(i)+"}'" for i in range(len(l)))
query="select name from students where id (%s)"%placeholders
query=query.format(*l)
cursor.execute(query)

这应该可以解决您的问题。

placeholders= ', '.join("'{"+str(i)+"}'" for i in range(len(l)))
query="select name from students where id (%s)"%placeholders
query=query.format(*l)
cursor.execute(query)

This should solve your problem.


回答 8

如果您将PostgreSQL与Psycopg2库一起使用,则可以让其元组适应为您完成所有转义和字符串插值,例如:

ids = [1,2,3]
cur.execute(
  "SELECT * FROM foo WHERE id IN %s",
  [tuple(ids)])

即只需确保您将IN参数作为传递tuple。如果是,则list可以使用= ANY数组语法

cur.execute(
  "SELECT * FROM foo WHERE id = ANY (%s)",
  [list(ids)])

请注意,这两个都将变成相同的查询计划,因此您应该只使用较容易的那个。例如,如果您的列表位于一个元组中,则使用前者;如果它们存储在列表中,则使用后者。

If you’re using PostgreSQL with the Psycopg2 library you can let its tuple adaption do all the escaping and string interpolation for you, e.g:

ids = [1,2,3]
cur.execute(
  "SELECT * FROM foo WHERE id IN %s",
  [tuple(ids)])

i.e. just make sure that you’re passing the IN parameter as a tuple. if it’s a list you can use the = ANY array syntax:

cur.execute(
  "SELECT * FROM foo WHERE id = ANY (%s)",
  [list(ids)])

note that these both will get turned into the same query plan so you should just use whichever is easier. e.g. if your list comes in a tuple use the former, if they’re stored in a list use the latter.


回答 9

例如,如果要使用sql查询:

select name from studens where id in (1, 5, 8)

关于什么:

my_list = [1, 5, 8]
cur.execute("select name from studens where id in %s" % repr(my_list).replace('[','(').replace(']',')') )

For example, if you want the sql query:

select name from studens where id in (1, 5, 8)

What about:

my_list = [1, 5, 8]
cur.execute("select name from studens where id in %s" % repr(my_list).replace('[','(').replace(']',')') )

回答 10

一个更简单的解决方案:

lst = [1,2,3,a,b,c]

query = f"""SELECT * FROM table WHERE IN {str(lst)[1:-1}"""

a simpler solution:

lst = [1,2,3,a,b,c]

query = f"""SELECT * FROM table WHERE IN {str(lst)[1:-1}"""

回答 11

l = [1] # or [1,2,3]

query = "SELECT * FROM table WHERE id IN :l"
params = {'l' : tuple(l)}
cursor.execute(query, params)

:var符号似乎简单。(Python 3.7)

l = [1] # or [1,2,3]

query = "SELECT * FROM table WHERE id IN :l"
params = {'l' : tuple(l)}
cursor.execute(query, params)

The :var notation seems simpler. (Python 3.7)


回答 12

这使用参数替换并处理单个值列表的情况:

l = [1,5,8]

get_operator = lambda x: '=' if len(x) == 1 else 'IN'
get_value = lambda x: int(x[0]) if len(x) == 1 else x

query = 'SELECT * FROM table where id ' + get_operator(l) + ' %s'

cursor.execute(query, (get_value(l),))

This uses parameter substitution and takes care of the single value list case:

l = [1,5,8]

get_operator = lambda x: '=' if len(x) == 1 else 'IN'
get_value = lambda x: int(x[0]) if len(x) == 1 else x

query = 'SELECT * FROM table where id ' + get_operator(l) + ' %s'

cursor.execute(query, (get_value(l),))

如何从sqlite查询中获取字典?

问题:如何从sqlite查询中获取字典?

db = sqlite.connect("test.sqlite")
res = db.execute("select * from table")

通过迭代,我得到了对应于行的列表。

for row in res:
    print row

我可以得到列的名称

col_name_list = [tuple[0] for tuple in res.description]

但是是否有一些功能或设置可以获取字典而不是列表?

{'col1': 'value', 'col2': 'value'}

还是我必须自己做?

db = sqlite.connect("test.sqlite")
res = db.execute("select * from table")

With iteration I get lists coresponding to the rows.

for row in res:
    print row

I can get name of the columns

col_name_list = [tuple[0] for tuple in res.description]

But is there some function or setting to get dictionaries instead of list?

{'col1': 'value', 'col2': 'value'}

or I have to do myself?


回答 0

您可以使用row_factory,如docs中的示例所示:

import sqlite3

def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
cur = con.cursor()
cur.execute("select 1 as a")
print cur.fetchone()["a"]

或按照文档中此示例之后给出的建议进行操作:

如果返回一个元组还不够,并且您希望基于名称的列访问,则应考虑将row_factory设置为高度优化的sqlite3.Row类型。Row提供对列的基于索引和不区分大小写的基于名称的访问,几乎没有内存开销。它可能比您自己的基于字典的自定义方法甚至基于db_row的解决方案都要好。

You could use row_factory, as in the example in the docs:

import sqlite3

def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
cur = con.cursor()
cur.execute("select 1 as a")
print cur.fetchone()["a"]

or follow the advice that’s given right after this example in the docs:

If returning a tuple doesn’t suffice and you want name-based access to columns, you should consider setting row_factory to the highly-optimized sqlite3.Row type. Row provides both index-based and case-insensitive name-based access to columns with almost no memory overhead. It will probably be better than your own custom dictionary-based approach or even a db_row based solution.


回答 1

我以为我已经回答了这个问题,即使亚当·施密德(Adam Schmideg)和亚历克斯·马特利(Alex Martelli)的回答中都提到了部分答案。为了让其他像我一样有相同问题的人,可以轻松找到答案。

conn = sqlite3.connect(":memory:")

#This is the important part, here we are setting row_factory property of
#connection object to sqlite3.Row(sqlite3.Row is an implementation of
#row_factory)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('select * from stocks')

result = c.fetchall()
#returns a list of dictionaries, each item in list(each dictionary)
#represents a row of the table

I thought I answer this question even though the answer is partly mentioned in both Adam Schmideg’s and Alex Martelli’s answers. In order for others like me that have the same question, to find the answer easily.

conn = sqlite3.connect(":memory:")

#This is the important part, here we are setting row_factory property of
#connection object to sqlite3.Row(sqlite3.Row is an implementation of
#row_factory)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('select * from stocks')

result = c.fetchall()
#returns a list of dictionaries, each item in list(each dictionary)
#represents a row of the table

回答 2

即使使用sqlite3.Row类-您仍然不能使用以下形式的字符串格式:

print "%(id)i - %(name)s: %(value)s" % row

为了解决这个问题,我使用了一个辅助函数,该函数接受行并将其转换为字典。我仅在字典对象比Row对象更可取时才使用它(例如,对于诸如字符串格式之类的东西,其中Row对象本身也不支持字典API)。但是其他所有时间都使用Row对象。

def dict_from_row(row):
    return dict(zip(row.keys(), row))       

Even using the sqlite3.Row class– you still can’t use string formatting in the form of:

print "%(id)i - %(name)s: %(value)s" % row

In order to get past this, I use a helper function that takes the row and converts to a dictionary. I only use this when the dictionary object is preferable to the Row object (e.g. for things like string formatting where the Row object doesn’t natively support the dictionary API as well). But use the Row object all other times.

def dict_from_row(row):
    return dict(zip(row.keys(), row))       

回答 3

连接到SQLite之后: con = sqlite3.connect(.....)只需运行即可:

con.row_factory = sqlite3.Row

瞧!

After you connect to SQLite: con = sqlite3.connect(.....) it is sufficient to just run:

con.row_factory = sqlite3.Row

Voila!


回答 4

PEP 249

Question: 

   How can I construct a dictionary out of the tuples returned by
   .fetch*():

Answer:

   There are several existing tools available which provide
   helpers for this task. Most of them use the approach of using
   the column names defined in the cursor attribute .description
   as basis for the keys in the row dictionary.

   Note that the reason for not extending the DB API specification
   to also support dictionary return values for the .fetch*()
   methods is that this approach has several drawbacks:

   * Some databases don't support case-sensitive column names or
     auto-convert them to all lowercase or all uppercase
     characters.

   * Columns in the result set which are generated by the query
     (e.g.  using SQL functions) don't map to table column names
     and databases usually generate names for these columns in a
     very database specific way.

   As a result, accessing the columns through dictionary keys
   varies between databases and makes writing portable code
   impossible.

所以是的,你自己做。

From PEP 249:

Question: 

   How can I construct a dictionary out of the tuples returned by
   .fetch*():

Answer:

   There are several existing tools available which provide
   helpers for this task. Most of them use the approach of using
   the column names defined in the cursor attribute .description
   as basis for the keys in the row dictionary.

   Note that the reason for not extending the DB API specification
   to also support dictionary return values for the .fetch*()
   methods is that this approach has several drawbacks:

   * Some databases don't support case-sensitive column names or
     auto-convert them to all lowercase or all uppercase
     characters.

   * Columns in the result set which are generated by the query
     (e.g.  using SQL functions) don't map to table column names
     and databases usually generate names for these columns in a
     very database specific way.

   As a result, accessing the columns through dictionary keys
   varies between databases and makes writing portable code
   impossible.

So yes, do it yourself.


回答 5

较短的版本:

db.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])

Shorter version:

db.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])

回答 6

在我的测试中最快:

conn.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r))
c = conn.cursor()

%timeit c.execute('SELECT * FROM table').fetchall()
19.8 µs ± 1.05 µs per loop (mean ± std. dev. of 7 runs, 100000 loops each)

vs:

conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])
c = conn.cursor()

%timeit c.execute('SELECT * FROM table').fetchall()
19.4 µs ± 75.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

你决定 :)

Fastest on my tests:

conn.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r))
c = conn.cursor()

%timeit c.execute('SELECT * FROM table').fetchall()
19.8 µs ± 1.05 µs per loop (mean ± std. dev. of 7 runs, 100000 loops each)

vs:

conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])
c = conn.cursor()

%timeit c.execute('SELECT * FROM table').fetchall()
19.4 µs ± 75.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

You decide :)


回答 7

与上述解决方案类似,但最紧凑:

db.row_factory = lambda C, R: { c[0]: R[i] for i, c in enumerate(C.description) }

Similar like before-mentioned solutions, but most compact:

db.row_factory = lambda C, R: { c[0]: R[i] for i, c in enumerate(C.description) }

回答 8

正如@gandalf的答案所提到的,必须使用conn.row_factory = sqlite3.Row,但是结果不是直接的字典。必须dict在上一个循环中添加一个附加的“ cast” :

import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute('create table t (a text, b text, c text)')
conn.execute('insert into t values ("aaa", "bbb", "ccc")')
conn.execute('insert into t values ("AAA", "BBB", "CCC")')
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('select * from t')
for r in c.fetchall():
    print(dict(r))

# {'a': 'aaa', 'b': 'bbb', 'c': 'ccc'}
# {'a': 'AAA', 'b': 'BBB', 'c': 'CCC'}

As mentioned by @gandalf’s answer, one has to use conn.row_factory = sqlite3.Row, but the results are not directly dictionaries. One has to add an additional “cast” to dict in the last loop:

import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute('create table t (a text, b text, c text)')
conn.execute('insert into t values ("aaa", "bbb", "ccc")')
conn.execute('insert into t values ("AAA", "BBB", "CCC")')
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('select * from t')
for r in c.fetchall():
    print(dict(r))

# {'a': 'aaa', 'b': 'bbb', 'c': 'ccc'}
# {'a': 'AAA', 'b': 'BBB', 'c': 'CCC'}

回答 9

我认为您在正确的轨道上。让我们保持非常简单并完成您要执行的操作:

import sqlite3
db = sqlite3.connect("test.sqlite3")
cur = db.cursor()
res = cur.execute("select * from table").fetchall()
data = dict(zip([c[0] for c in cur.description], res[0]))

print(data)

缺点是.fetchall(),这是您消耗内存的谋杀手段,如果表很大。但是对于仅处理数千行文本和数字列的琐碎应用程序而言,这种简单的方法就足够了。

对于严重的问题,您应该按照其他许多答案中的建议研究行工厂。

I think you were on the right track. Let’s keep this very simple and complete what you were trying to do:

import sqlite3
db = sqlite3.connect("test.sqlite3")
cur = db.cursor()
res = cur.execute("select * from table").fetchall()
data = dict(zip([c[0] for c in cur.description], res[0]))

print(data)

The downside is that .fetchall(), which is murder on your memory consumption, if your table is very large. But for trivial applications dealing with mere few thousands of rows of text and numeric columns, this simple approach is good enough.

For serious stuff, you should look into row factories, as proposed in many other answers.


回答 10

或者,您可以按以下方式将sqlite3.Rows转换为字典。这将为字典提供每一行的列表。

    def from_sqlite_Row_to_dict(list_with_rows):
    ''' Turn a list with sqlite3.Row objects into a dictionary'''
    d ={} # the dictionary to be filled with the row data and to be returned

    for i, row in enumerate(list_with_rows): # iterate throw the sqlite3.Row objects            
        l = [] # for each Row use a separate list
        for col in range(0, len(row)): # copy over the row date (ie. column data) to a list
            l.append(row[col])
        d[i] = l # add the list to the dictionary   
    return d

Or you could convert the sqlite3.Rows to a dictionary as follows. This will give a dictionary with a list for each row.

    def from_sqlite_Row_to_dict(list_with_rows):
    ''' Turn a list with sqlite3.Row objects into a dictionary'''
    d ={} # the dictionary to be filled with the row data and to be returned

    for i, row in enumerate(list_with_rows): # iterate throw the sqlite3.Row objects            
        l = [] # for each Row use a separate list
        for col in range(0, len(row)): # copy over the row date (ie. column data) to a list
            l.append(row[col])
        d[i] = l # add the list to the dictionary   
    return d

回答 11

通用替代方案,仅使用三行

def select_column_and_value(db, sql, parameters=()):
    execute = db.execute(sql, parameters)
    fetch = execute.fetchone()
    return {k[0]: v for k, v in list(zip(execute.description, fetch))}

con = sqlite3.connect('/mydatabase.db')
c = con.cursor()
print(select_column_and_value(c, 'SELECT * FROM things WHERE id=?', (id,)))

但是,如果您的查询未返回任何内容,将导致错误。在这种情况下…

def select_column_and_value(self, sql, parameters=()):
    execute = self.execute(sql, parameters)
    fetch = execute.fetchone()

    if fetch is None:
        return {k[0]: None for k in execute.description}

    return {k[0]: v for k, v in list(zip(execute.description, fetch))}

要么

def select_column_and_value(self, sql, parameters=()):
    execute = self.execute(sql, parameters)
    fetch = execute.fetchone()

    if fetch is None:
        return {}

    return {k[0]: v for k, v in list(zip(execute.description, fetch))}

A generic alternative, using just three lines

def select_column_and_value(db, sql, parameters=()):
    execute = db.execute(sql, parameters)
    fetch = execute.fetchone()
    return {k[0]: v for k, v in list(zip(execute.description, fetch))}

con = sqlite3.connect('/mydatabase.db')
c = con.cursor()
print(select_column_and_value(c, 'SELECT * FROM things WHERE id=?', (id,)))

But if your query returns nothing, will result in error. In this case…

def select_column_and_value(self, sql, parameters=()):
    execute = self.execute(sql, parameters)
    fetch = execute.fetchone()

    if fetch is None:
        return {k[0]: None for k in execute.description}

    return {k[0]: v for k, v in list(zip(execute.description, fetch))}

or

def select_column_and_value(self, sql, parameters=()):
    execute = self.execute(sql, parameters)
    fetch = execute.fetchone()

    if fetch is None:
        return {}

    return {k[0]: v for k, v in list(zip(execute.description, fetch))}

回答 12

import sqlite3

db = sqlite3.connect('mydatabase.db')
cursor = db.execute('SELECT * FROM students ORDER BY CREATE_AT')
studentList = cursor.fetchall()

columnNames = list(map(lambda x: x[0], cursor.description)) #students table column names list
studentsAssoc = {} #Assoc format is dictionary similarly


#THIS IS ASSOC PROCESS
for lineNumber, student in enumerate(studentList):
    studentsAssoc[lineNumber] = {}

    for columnNumber, value in enumerate(student):
        studentsAssoc[lineNumber][columnNames[columnNumber]] = value


print(studentsAssoc)

结果肯定是正确的,但我不知道最好的。

import sqlite3

db = sqlite3.connect('mydatabase.db')
cursor = db.execute('SELECT * FROM students ORDER BY CREATE_AT')
studentList = cursor.fetchall()

columnNames = list(map(lambda x: x[0], cursor.description)) #students table column names list
studentsAssoc = {} #Assoc format is dictionary similarly


#THIS IS ASSOC PROCESS
for lineNumber, student in enumerate(studentList):
    studentsAssoc[lineNumber] = {}

    for columnNumber, value in enumerate(student):
        studentsAssoc[lineNumber][columnNames[columnNumber]] = value


print(studentsAssoc)

The result is definitely true, but I do not know the best.


回答 13

python中的字典提供对元素的任意访问。因此,任何带有“名称”的词典,尽管一方面可能会提供更多信息(又称字段名称),却会使字段“无序”,这可能是不必要的。

最好的方法是将名称放在单独的列表中,然后根据需要自己将其与结果组合。

try:
         mycursor = self.memconn.cursor()
         mycursor.execute('''SELECT * FROM maintbl;''')
         #first get the names, because they will be lost after retrieval of rows
         names = list(map(lambda x: x[0], mycursor.description))
         manyrows = mycursor.fetchall()

         return manyrows, names

还请记住,在所有方法中,名称都是您在查询中提供的名称,而不是数据库中的名称。exceptions是SELECT * FROM

如果您唯一关心的是使用字典来获得结果,则一定要使用conn.row_factory = sqlite3.Row(已经在另一个答案中说明了)。

Dictionaries in python provide arbitrary access to their elements. So any dictionary with “names” although it might be informative on one hand (a.k.a. what are the field names) “un-orders” the fields, which might be unwanted.

Best approach is to get the names in a separate list and then combine them with the results by yourself, if needed.

try:
         mycursor = self.memconn.cursor()
         mycursor.execute('''SELECT * FROM maintbl;''')
         #first get the names, because they will be lost after retrieval of rows
         names = list(map(lambda x: x[0], mycursor.description))
         manyrows = mycursor.fetchall()

         return manyrows, names

Also remember that the names, in all approaches, are the names you provided in the query, not the names in database. Exception is the SELECT * FROM

If your only concern is to get the results using a dictionary, then definitely use the conn.row_factory = sqlite3.Row (already stated in another answer).


如何使用SqlAlchemy通过ID查询数据库?

问题:如何使用SqlAlchemy通过ID查询数据库?

我需要按其查询SQLAlchemy数据库 id类似于以下内容

User.query.filter_by(username =’peter’)

但为身份证。我该怎么做呢?[通过Google和SO搜索没有帮助]

I need to query a SQLAlchemy database by its id something similar to

User.query.filter_by(username=’peter’)

but for id. How do I do this? [Searching over Google and SO didn’t help]


回答 0

查询具有一个get函数,该函数支持通过表的主键进行查询,我认为id是这样。

例如,要查询ID为23的对象:

User.query.get(23)

注意:正如其他一些评论者和答案所述,这不仅仅是“对主键执行查询过滤”的简写。根据SQLAlchemy会话的状态,运行此代码可能会查询数据库并返回一个新实例,或者它可能会返回在代码中较早查询的对象的实例,而无需实际查询数据库。如果您尚未这样做,请考虑阅读SQLAlchemy会话上的文档以了解后果。

Query has a get function that supports querying by the primary key of the table, which I assume that id is.

For example, to query for an object with ID of 23:

User.query.get(23)

Note: As a few other commenters and answers have mentioned, this is not simply shorthand for “Perform a query filtering on the primary key”. Depending on the state of the SQLAlchemy session, running this code may query the database and return a new instance, or it may return an instance of an object queried earlier in your code without actually querying the database. If you have not already done so, consider reading the documentation on the SQLAlchemy Session to understand the ramifications.


回答 1

您可以像这样查询ID = 1的用户

session.query(User).get(1)

You can query an User with id = 1 like this

session.query(User).get(1)


回答 2

有时get()并非您所期望的:

如果您的交易已完成:

>>> session.query(User).get(1)
[SQL]: BEGIN (implicit)
[SQL]: SELECT user.id AS user_id, user.name AS user_name, user.fullname AS user_fullname
FROM user
WHERE user.id = ?
[SQL]: (1,)
<User(u'ed', u'Ed Jones')>

如果您正在进行事务处理(get()将在不查询数据库的情况下为您提供结果对象在内存中):

>>> session.query(User).get(1)
<User(u'ed', u'Ed Jones')>

最好使用这个:

>>> session.query(User.name).filter(User.id == 1).first()
[SQL]: SELECT user.name AS user_name
FROM user
WHERE user.id = ?
 LIMIT ? OFFSET ?
[SQL]: (1, 1, 0)
(u'Edwardo',)

get() is not as your expected sometimes:

if your transaction was done:

>>> session.query(User).get(1)
[SQL]: BEGIN (implicit)
[SQL]: SELECT user.id AS user_id, user.name AS user_name, user.fullname AS user_fullname
FROM user
WHERE user.id = ?
[SQL]: (1,)
<User(u'ed', u'Ed Jones')>

if you are in a transaction(get() will give you the result object in memory without query the database):

>>> session.query(User).get(1)
<User(u'ed', u'Ed Jones')>

better to use this:

>>> session.query(User.name).filter(User.id == 1).first()
[SQL]: SELECT user.name AS user_name
FROM user
WHERE user.id = ?
 LIMIT ? OFFSET ?
[SQL]: (1, 1, 0)
(u'Edwardo',)

回答 3

如果您使用的tables reflection话,可能会遇到给定的解决方案问题。(这里的先前解决方案对我不起作用)。

我最终使用的是:

session.query(object._class_).get(id)

object是通过反射从数据库中检索到的,这就是为什么需要使用.__class__

我希望这有帮助。

If you use tables reflection you might have problems with the solutions given. (The previous solutions here didn’t work for me).

What I ended up using was:

session.query(object._class_).get(id)

(object was retrieved by reflection from the database, this is why you need to use .__class__)

I hope this helps.


回答 4

首先,应将其设置id为主键。
然后您可以使用该query.get()方法来查询对象id已经是主键的对象。

由于该query.get()方法通过主键查询对象。
Flask-SQLAlchemy文档推断

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
db = SQLAlchemy()
db.init_app(app)

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)

def test():
    id = 1
    user = User.query.get(id)

First, you should set id as the primary key.
Then you could use the query.get() method to query objects by id which is already the primary key.

Since the query.get() method to query objects by the primary key.
Inferred from Flask-SQLAlchemy documentation

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
db = SQLAlchemy()
db.init_app(app)

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)

def test():
    id = 1
    user = User.query.get(id)

Django查询中的“ LIKE”等效SQL

问题:Django查询中的“ LIKE”等效SQL

django中的此SQL语句等效于什么?

SELECT * FROM table_name WHERE string LIKE pattern;

如何在Django中实现呢?我试过了

result = table.objects.filter( pattern in string )

但这没有用。我该如何实施?

What is the equivalent of this SQL statement in django?

SELECT * FROM table_name WHERE string LIKE pattern;

How do I implement this in django? I tried

result = table.objects.filter( pattern in string )

But that did not work. How do i implement this?


回答 0

使用__contains__icontains(不区分大小写):

result = table.objects.filter(string__contains='pattern')

SQL等效为

SELECT ... WHERE string LIKE '%pattern%';

Use __contains or __icontains (case-insensitive):

result = table.objects.filter(string__contains='pattern')

The SQL equivalent is

SELECT ... WHERE string LIKE '%pattern%';

回答 1

由falsetru提及的包含和icontains使查询类似 SELECT ... WHERE headline LIKE '%pattern%

与它们一起,您可能需要具有类似行为的这些: startswithistartswithendswithiendswith

制造

SELECT ... WHERE headline LIKE 'pattern%

要么

SELECT ... WHERE headline LIKE '%pattern

contains and icontains mentioned by falsetru make queries like SELECT ... WHERE headline LIKE '%pattern%

Along with them, you might need these ones with similar behavior: startswith, istartswith, endswith, iendswith

making

SELECT ... WHERE headline LIKE 'pattern%

or

SELECT ... WHERE headline LIKE '%pattern


回答 2

result = table.objects.filter(string__icontains='pattern')

不区分大小写的字段中搜索字符串。

result = table.objects.filter(string__icontains='pattern')

Case insensitive search for string in a field.


回答 3

为了像sql LIKE’%pattern%’语句中那样保留单词的顺序,我使用了iregex,例如:

qs = table.objects.filter(string__iregex=pattern.replace(' ', '.*'))

字符串方法是不可变的,因此您的模式变量不会更改,并且使用。*时,您会寻找0个或多个出现的任何字符,但要换行。

通过使用以下代码遍历模式词:

qs = table.objects
for word in pattern.split(' '):
    qs = qs.filter(string__icontains=word)

对于某些可能有用的人,将不会保留模式中单词的顺序,但是在尝试模仿sql like语句的情况下,我将使用第一个选项。

In order to preserve the order of the words as in the sql LIKE ‘%pattern%’ statement I use iregex, for example:

qs = table.objects.filter(string__iregex=pattern.replace(' ', '.*'))

string methods are immutable so your pattern variable will not change and with .* you’ll be looking for 0 or more occurrences of any character but break lines.

By using the following to iterate over the pattern words:

qs = table.objects
for word in pattern.split(' '):
    qs = qs.filter(string__icontains=word)

the order of the words in your pattern will not be preserved, for some people that could work but in the case of trying to mimic the sql like statement I’ll use the first option.


回答 4

这可以通过Django的自定义查询来完成。我已经将查询转换为Django式lookup应用程序。安装后__like,使用%和查找_将启用通配符。

该应用程序中所有必需的代码是:

from django.db.models import Lookup
from django.db.models.fields import Field


@Field.register_lookup
class Like(Lookup):
    lookup_name = 'like'

    def as_sql(self, compiler, connection):
        lhs, lhs_params = self.process_lhs(compiler, connection)
        rhs, rhs_params = self.process_rhs(compiler, connection)
        params = lhs_params + rhs_params
        return '%s LIKE %s' % (lhs, rhs), params

This can be done with Django’s custom lookups. I have made the lookup into a Django-like-lookup application. After installing it the __like lookup with the % and _ wildcards will be enabled.

All the necessary code in the application is:

from django.db.models import Lookup
from django.db.models.fields import Field


@Field.register_lookup
class Like(Lookup):
    lookup_name = 'like'

    def as_sql(self, compiler, connection):
        lhs, lhs_params = self.process_lhs(compiler, connection)
        rhs, rhs_params = self.process_rhs(compiler, connection)
        params = lhs_params + rhs_params
        return '%s LIKE %s' % (lhs, rhs), params

SQLAlchemy:如何过滤日期字段?

问题:SQLAlchemy:如何过滤日期字段?

这是模型:

class User(Base):
    ...
    birthday = Column(Date, index=True)   #in database it's like '1987-01-17'
    ...

我想在两个日期之间进行过滤,例如选择间隔18-30年的所有用户。

如何用SQLAlchemy实现它?

我想:

query = DBSession.query(User).filter(
    and_(User.birthday >= '1988-01-17', User.birthday <= '1985-01-17')
) 

# means age >= 24 and age <= 27

我知道这是不正确的,但是该怎么做正确呢?

Here is model:

class User(Base):
    ...
    birthday = Column(Date, index=True)   #in database it's like '1987-01-17'
    ...

I want to filter between two dates, for example to choose all users in interval 18-30 years.

How to implement it with SQLAlchemy?

I think of:

query = DBSession.query(User).filter(
    and_(User.birthday >= '1988-01-17', User.birthday <= '1985-01-17')
) 

# means age >= 24 and age <= 27

I know this is not correct, but how to do correct?


回答 0

实际上,除了错别字,您的查询是正确的:您的过滤器排除了所有记录:您应该更改<=for >=,反之亦然:

qry = DBSession.query(User).filter(
        and_(User.birthday <= '1988-01-17', User.birthday >= '1985-01-17'))
# or same:
qry = DBSession.query(User).filter(User.birthday <= '1988-01-17').\
        filter(User.birthday >= '1985-01-17')

您也可以使用between

qry = DBSession.query(User).filter(User.birthday.between('1985-01-17', '1988-01-17'))

In fact, your query is right except for the typo: your filter is excluding all records: you should change the <= for >= and vice versa:

qry = DBSession.query(User).filter(
        and_(User.birthday <= '1988-01-17', User.birthday >= '1985-01-17'))
# or same:
qry = DBSession.query(User).filter(User.birthday <= '1988-01-17').\
        filter(User.birthday >= '1985-01-17')

Also you can use between:

qry = DBSession.query(User).filter(User.birthday.between('1985-01-17', '1988-01-17'))

回答 1

from app import SQLAlchemyDB as db

Chance.query.filter(Chance.repo_id==repo_id, 
                    Chance.status=="1", 
                    db.func.date(Chance.apply_time)<=end, 
                    db.func.date(Chance.apply_time)>=start).count()

它等于:

select
   count(id)
from
   Chance
where
   repo_id=:repo_id 
   and status='1'
   and date(apple_time) <= end
   and date(apple_time) >= start

希望可以帮助你。

from app import SQLAlchemyDB as db

Chance.query.filter(Chance.repo_id==repo_id, 
                    Chance.status=="1", 
                    db.func.date(Chance.apply_time)<=end, 
                    db.func.date(Chance.apply_time)>=start).count()

it is equal to:

select
   count(id)
from
   Chance
where
   repo_id=:repo_id 
   and status='1'
   and date(apple_time) <= end
   and date(apple_time) >= start

wish can help you.


回答 2

如果要获得整个期间:

    from sqlalchemy import and_, func

    query = DBSession.query(User).filter(and_(func.date(User.birthday) >= '1985-01-17'),\
                                              func.date(User.birthday) <= '1988-01-17'))

这表示范围:1985-01-17 00 : 00-1988-01-17 23:59

if you want to get the whole period:

    from sqlalchemy import and_, func

    query = DBSession.query(User).filter(and_(func.date(User.birthday) >= '1985-01-17'),\
                                              func.date(User.birthday) <= '1988-01-17'))

That means range: 1985-01-17 00:001988-01-17 23:59


如何从SQLAlchemy表达式获取原始的编译SQL查询?

问题:如何从SQLAlchemy表达式获取原始的编译SQL查询?

我有一个SQLAlchemy查询对象,想要获取已绑定所有参数的已编译SQL语句的文本(例如,否%s或其他变量正等待语句编译器或MySQLdb方言引擎的绑定等)。

调用str()查询将显示如下内容:

SELECT id WHERE date_added <= %s AND date_added >= %s ORDER BY count DESC

我试着在query._params中查找,但这是一个空字典。我使用装饰器的这个示例sqlalchemy.ext.compiler.compiles编写了自己的编译器,但即使那里的语句仍然有%s我想要的数据。

我无法弄清楚何时混入参数来创建查询。在检查查询对象时,它们始终是一个空字典(尽管查询执行得很好,并且当您打开echo记录时引擎会打印出来)。

我开始收到消息,SQLAlchemy不想让我知道底层查询,因为它破坏了表达式API接口的所有不同DB-API的一般性质。我不在乎查询是否在我发现查询之前就已经执行了;我只是想知道!

I have a SQLAlchemy query object and want to get the text of the compiled SQL statement, with all its parameters bound (e.g. no %s or other variables waiting to be bound by the statement compiler or MySQLdb dialect engine, etc).

Calling str() on the query reveals something like this:

SELECT id WHERE date_added <= %s AND date_added >= %s ORDER BY count DESC

I’ve tried looking in query._params but it’s an empty dict. I wrote my own compiler using this example of the sqlalchemy.ext.compiler.compiles decorator but even the statement there still has %s where I want data.

I can’t quite figure out when my parameters get mixed in to create the query; when examining the query object they’re always an empty dictionary (though the query executes fine and the engine prints it out when you turn echo logging on).

I’m starting to get the message that SQLAlchemy doesn’t want me to know the underlying query, as it breaks the general nature of the expression API’s interface all the different DB-APIs. I don’t mind if the query gets executed before I found out what it was; I just want to know!


回答 0

博客提供了更新的答案。

引用博客文章中的内容,这对我来说是建议和有效的。

>>> from sqlalchemy.dialects import postgresql
>>> print str(q.statement.compile(dialect=postgresql.dialect()))

其中q定义为:

>>> q = DBSession.query(model.Name).distinct(model.Name.value) \
             .order_by(model.Name.value)

或者只是任何一种session.query()。

感谢Nicolas Cadou的回答!希望对其他在这里搜索的人有所帮助。

This blog provides an updated answer.

Quoting from the blog post, this is suggested and worked for me.

>>> from sqlalchemy.dialects import postgresql
>>> print str(q.statement.compile(dialect=postgresql.dialect()))

Where q is defined as:

>>> q = DBSession.query(model.Name).distinct(model.Name.value) \
             .order_by(model.Name.value)

Or just any kind of session.query().

Thanks to Nicolas Cadou for the answer! I hope it helps others who come searching here.


回答 1

文档用于literal_binds打印q包含参数的查询:

print(q.statement.compile(compile_kwargs={"literal_binds": True}))

上面的方法有一个警告:仅基本类型(例如int和字符串)才支持该方法,此外,如果直接使用没有预设值的bindparam(),则也不能将其字符串化。

该文档还发出以下警告:

切勿将此技术与从不受信任的输入(例如从Web表单或其他用户输入应用程序)接收到的字符串内容一起使用。SQLAlchemy的将Python值强制转换为直接SQL字符串值的功能对于不受信任的输入是不安全的,并且无法验证传递的数据类型。以编程方式对关系数据库调用非DDL SQL语句时,请始终使用绑定参数。

The documentation uses literal_binds to print a query q including parameters:

print(q.statement.compile(compile_kwargs={"literal_binds": True}))

the above approach has the caveats that it is only supported for basic types, such as ints and strings, and furthermore if a bindparam() without a pre-set value is used directly, it won’t be able to stringify that either.

The documentation also issues this warning:

Never use this technique with string content received from untrusted input, such as from web forms or other user-input applications. SQLAlchemy’s facilities to coerce Python values into direct SQL string values are not secure against untrusted input and do not validate the type of data being passed. Always use bound parameters when programmatically invoking non-DDL SQL statements against a relational database.


回答 2

这应该适用于Sqlalchemy> = 0.6

from sqlalchemy.sql import compiler

from psycopg2.extensions import adapt as sqlescape
# or use the appropiate escape function from your db driver

def compile_query(query):
    dialect = query.session.bind.dialect
    statement = query.statement
    comp = compiler.SQLCompiler(dialect, statement)
    comp.compile()
    enc = dialect.encoding
    params = {}
    for k,v in comp.params.iteritems():
        if isinstance(v, unicode):
            v = v.encode(enc)
        params[k] = sqlescape(v)
    return (comp.string.encode(enc) % params).decode(enc)

This should work with Sqlalchemy >= 0.6

from sqlalchemy.sql import compiler

from psycopg2.extensions import adapt as sqlescape
# or use the appropiate escape function from your db driver

def compile_query(query):
    dialect = query.session.bind.dialect
    statement = query.statement
    comp = compiler.SQLCompiler(dialect, statement)
    comp.compile()
    enc = dialect.encoding
    params = {}
    for k,v in comp.params.iteritems():
        if isinstance(v, unicode):
            v = v.encode(enc)
        params[k] = sqlescape(v)
    return (comp.string.encode(enc) % params).decode(enc)

回答 3

对于MySQLdb后端,我稍微修改了albertov的出色答案(非常感谢!)。我敢肯定,可以将它们合并以检查是否存在comp.positionalTrue但这超出了此问题的范围。

def compile_query(query):
    from sqlalchemy.sql import compiler
    from MySQLdb.converters import conversions, escape

    dialect = query.session.bind.dialect
    statement = query.statement
    comp = compiler.SQLCompiler(dialect, statement)
    comp.compile()
    enc = dialect.encoding
    params = []
    for k in comp.positiontup:
        v = comp.params[k]
        if isinstance(v, unicode):
            v = v.encode(enc)
        params.append( escape(v, conversions) )
    return (comp.string.encode(enc) % tuple(params)).decode(enc)

For the MySQLdb backend I modified albertov’s awesome answer (thanks so much!) a bit. I’m sure they could be merged to check if comp.positional was True but that’s slightly beyond the scope of this question.

def compile_query(query):
    from sqlalchemy.sql import compiler
    from MySQLdb.converters import conversions, escape

    dialect = query.session.bind.dialect
    statement = query.statement
    comp = compiler.SQLCompiler(dialect, statement)
    comp.compile()
    enc = dialect.encoding
    params = []
    for k in comp.positiontup:
        v = comp.params[k]
        if isinstance(v, unicode):
            v = v.encode(enc)
        params.append( escape(v, conversions) )
    return (comp.string.encode(enc) % tuple(params)).decode(enc)

回答 4

事实是,sqlalchemy永远不会将数据与查询混合在一起。查询和数据分别传递到基础数据库驱动程序-数据插值发生在数据库中。

Sqlalchemy如您所见将查询传递str(myquery)给数据库,并且值将进入一个单独的元组。

您可以使用一些方法自己在查询中插入数据(如下面的albertov所建议),但这与sqlalchemy正在执行的事情不同。

Thing is, sqlalchemy never mixes the data with your query. The query and the data are passed separately to your underlying database driver – the interpolation of data happens in your database.

Sqlalchemy passes the query as you’ve seen in str(myquery) to the database, and the values will go in a separate tuple.

You could use some approach where you interpolate the data with the query yourself (as albertov suggested below), but that’s not the same thing that sqlalchemy is executing.


回答 5

首先,让我先说一下,我假设您这样做主要是出于调试目的-我不建议您尝试尝试在SQLAlchemy Fluent API之外修改语句。

不幸的是,似乎没有一种简单的方法可以显示包含查询参数的已编译语句。SQLAlchemy实际上并未将参数放入语句中-它们已作为字典传递给数据库引擎。这使特定于数据库的库可以处理诸如转义特殊字符的操作,以避免SQL注入。

但是您可以很容易地在两步过程中完成此操作。要获取该语句,您可以按照显示的操作进行操作,只需打印查询:

>>> print(query)
SELECT field_1, field_2 FROM table WHERE id=%s;

使用query.statement可以更进一步,以查看参数名称。请注意:id_1下面和%s上面的内容-在这个非常简单的示例中并不是真正的问题,但是在更复杂的语句中可能是关键。

>>> print(query.statement)
>>> print(query.statement.compile()) # seems to be equivalent, you can also
                                     # pass in a dialect if you want
SELECT field_1, field_2 FROM table WHERE id=:id_1;

然后,您可以通过获取params已编译语句的属性来获取参数的实际值:

>>> print(query.statement.compile().params)
{u'id_1': 1} 

至少对MySQL后端有用。我希望它对于PostgreSQL也足够通用,无需使用psycopg2

First let me preface by saying that I assume you’re doing this mainly for debugging purposes — I wouldn’t recommend trying to modify the statement outside of the SQLAlchemy fluent API.

Unfortunately there doesn’t seem to be a simple way to show the compiled statement with the query parameters included. SQLAlchemy doesn’t actually put the parameters into the statement — they’re passed into the database engine as a dictionary. This lets the database-specific library handle things like escaping special characters to avoid SQL injection.

But you can do this in a two-step process reasonably easily. To get the statement, you can do as you’ve already shown, and just print the query:

>>> print(query)
SELECT field_1, field_2 FROM table WHERE id=%s;

You can get one step closer with query.statement, to see the parameter names. Note :id_1 below vs %s above — not really a problem in this very simple example, but could be key in a more complicated statement.

>>> print(query.statement)
>>> print(query.statement.compile()) # seems to be equivalent, you can also
                                     # pass in a dialect if you want
SELECT field_1, field_2 FROM table WHERE id=:id_1;

Then, you can get the actual values of the parameters by getting the params property of the compiled statement:

>>> print(query.statement.compile().params)
{u'id_1': 1} 

This worked for a MySQL backend at least; I would expect it’s also general enough for PostgreSQL without needing to use psycopg2.


回答 6

对于使用psycopg2的Postgresql后端,您可以侦听该do_execute事件,然后使用游标,语句并键入强制参数以及Cursor.mogrify()内联参数。您可以返回True以防止实际执行查询。

import sqlalchemy

class QueryDebugger(object):
    def __init__(self, engine, query):
        with engine.connect() as connection:
            try:
                sqlalchemy.event.listen(engine, "do_execute", self.receive_do_execute)
                connection.execute(query)
            finally:
                sqlalchemy.event.remove(engine, "do_execute", self.receive_do_execute)

    def receive_do_execute(self, cursor, statement, parameters, context):
        self.statement = statement
        self.parameters = parameters
        self.query = cursor.mogrify(statement, parameters)
        # Don't actually execute
        return True

用法示例:

>>> engine = sqlalchemy.create_engine("postgresql://postgres@localhost/test")
>>> metadata = sqlalchemy.MetaData()
>>> users = sqlalchemy.Table('users', metadata, sqlalchemy.Column("_id", sqlalchemy.String, primary_key=True), sqlalchemy.Column("document", sqlalchemy.dialects.postgresql.JSONB))
>>> s = sqlalchemy.select([users.c.document.label("foobar")]).where(users.c.document.contains({"profile": {"iid": "something"}}))
>>> q = QueryDebugger(engine, s)
>>> q.query
'SELECT users.document AS foobar \nFROM users \nWHERE users.document @> \'{"profile": {"iid": "something"}}\''
>>> q.statement
'SELECT users.document AS foobar \nFROM users \nWHERE users.document @> %(document_1)s'
>>> q.parameters
{'document_1': '{"profile": {"iid": "something"}}'}

For postgresql backend using psycopg2, you can listen for the do_execute event, then use the cursor, statement and type coerced parameters along with Cursor.mogrify() to inline the parameters. You can return True to prevent actual execution of the query.

import sqlalchemy

class QueryDebugger(object):
    def __init__(self, engine, query):
        with engine.connect() as connection:
            try:
                sqlalchemy.event.listen(engine, "do_execute", self.receive_do_execute)
                connection.execute(query)
            finally:
                sqlalchemy.event.remove(engine, "do_execute", self.receive_do_execute)

    def receive_do_execute(self, cursor, statement, parameters, context):
        self.statement = statement
        self.parameters = parameters
        self.query = cursor.mogrify(statement, parameters)
        # Don't actually execute
        return True

Sample usage:

>>> engine = sqlalchemy.create_engine("postgresql://postgres@localhost/test")
>>> metadata = sqlalchemy.MetaData()
>>> users = sqlalchemy.Table('users', metadata, sqlalchemy.Column("_id", sqlalchemy.String, primary_key=True), sqlalchemy.Column("document", sqlalchemy.dialects.postgresql.JSONB))
>>> s = sqlalchemy.select([users.c.document.label("foobar")]).where(users.c.document.contains({"profile": {"iid": "something"}}))
>>> q = QueryDebugger(engine, s)
>>> q.query
'SELECT users.document AS foobar \nFROM users \nWHERE users.document @> \'{"profile": {"iid": "something"}}\''
>>> q.statement
'SELECT users.document AS foobar \nFROM users \nWHERE users.document @> %(document_1)s'
>>> q.parameters
{'document_1': '{"profile": {"iid": "something"}}'}

回答 7

以下解决方案使用SQLAlchemy表达式语言并与SQLAlchemy 1.1一起使用。该解决方案不将参数与查询混合(按原始作者的要求),但是提供了一种使用SQLAlchemy模型为不同SQL方言生成SQL查询字符串和参数字典的方法。该示例基于教程http://docs.sqlalchemy.org/en/rel_1_0/core/tutorial.html

上课了

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class foo(Base):
    __tablename__ = 'foo'
    id = Column(Integer(), primary_key=True)
    name = Column(String(80), unique=True)
    value = Column(Integer())

我们可以使用select函数生成查询语句。

from sqlalchemy.sql import select    
statement = select([foo.name, foo.value]).where(foo.value > 0)

接下来,我们可以将语句编译成查询对象。

query = statement.compile()

默认情况下,该语句使用与SQLite和Oracle等SQL数据库兼容的基本“命名”实现进行编译。如果需要指定方言(例如PostgreSQL),则可以执行

from sqlalchemy.dialects import postgresql
query = statement.compile(dialect=postgresql.dialect())

或者,如果您想将方言明确指定为SQLite,则可以将参数样式从“ qmark”更改为“ named”。

from sqlalchemy.dialects import sqlite
query = statement.compile(dialect=sqlite.dialect(paramstyle="named"))

从查询对象中,我们可以提取查询字符串和查询参数

query_str = str(query)
query_params = query.params

最后执行查询。

conn.execute( query_str, query_params )

The following solution uses the SQLAlchemy Expression Language and works with SQLAlchemy 1.1. This solution does not mix the parameters with the query (as requested by the original author), but provides a way of using SQLAlchemy models to generate SQL query strings and parameter dictionaries for different SQL dialects. The example is based on the tutorial http://docs.sqlalchemy.org/en/rel_1_0/core/tutorial.html

Given the class,

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class foo(Base):
    __tablename__ = 'foo'
    id = Column(Integer(), primary_key=True)
    name = Column(String(80), unique=True)
    value = Column(Integer())

we can produce a query statement using the select function.

from sqlalchemy.sql import select    
statement = select([foo.name, foo.value]).where(foo.value > 0)

Next, we can compile the statement into a query object.

query = statement.compile()

By default, the statement is compiled using a basic ‘named’ implementation that is compatible with SQL databases such as SQLite and Oracle. If you need to specify a dialect such as PostgreSQL, you can do

from sqlalchemy.dialects import postgresql
query = statement.compile(dialect=postgresql.dialect())

Or if you want to explicitly specify the dialect as SQLite, you can change the paramstyle from ‘qmark’ to ‘named’.

from sqlalchemy.dialects import sqlite
query = statement.compile(dialect=sqlite.dialect(paramstyle="named"))

From the query object, we can extract the query string and query parameters

query_str = str(query)
query_params = query.params

and finally execute the query.

conn.execute( query_str, query_params )

回答 8

您可以使用ConnectionEvents系列的事件:after_cursor_executebefore_cursor_execute

@zzzeek提供的sqlalchemy UsageRecipes中,您可以找到以下示例:

Profiling

...
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())
    logger.debug("Start Query: %s" % statement % parameters)
...

在这里您可以访问您的对帐单

You can use events from ConnectionEvents family: after_cursor_execute or before_cursor_execute.

In sqlalchemy UsageRecipes by @zzzeek you can find this example:

Profiling

...
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())
    logger.debug("Start Query: %s" % statement % parameters)
...

Here you can get access to your statement


回答 9

因此,将这些不同答案的很多点放在一起,我得出了我需要的东西:一组简单的代码可以插入,偶尔但可靠地(即处理所有数据类型)获取发送给我的准确的,已编译的SQL通过查询查询本身的Postgres后端:

from sqlalchemy.dialects import postgresql

query = [ .... some ORM query .... ]

compiled_query = query.statement.compile(
    dialect=postgresql.dialect()
)
mogrified_query = session.connection().connection.cursor().mogrify(
    str(compiled_query),
    compiled_query.params
)

print("compiled SQL = {s}".format(mogrified_query.decode())

So, putting together a lot of little bits of these different answers, I came up with what I needed: a simple set of code to drop in and occasionally but reliably (i.e. handles all data types) grab the exact, compiled SQL sent to my Postgres backend by just interrogating the query itself:

from sqlalchemy.dialects import postgresql

query = [ .... some ORM query .... ]

compiled_query = query.statement.compile(
    dialect=postgresql.dialect(),
    compile_kwargs={"literal_binds": True}
)
mogrified_query = session.connection().connection.cursor().mogrify(
    str(compiled_query),
    compiled_query.params
)

print("compiled SQL = {s}".format(mogrified_query.decode())

回答 10

我认为.statement可能会解决问题:http ://docs.sqlalchemy.org/en/latest/orm/query.html?highlight=query

>>> local_session.query(sqlalchemy_declarative.SomeTable.text).statement
<sqlalchemy.sql.annotation.AnnotatedSelect at 0x6c75a20; AnnotatedSelectobject>
>>> x=local_session.query(sqlalchemy_declarative.SomeTable.text).statement
>>> print(x)
SELECT sometable.text 
FROM sometable

I think .statement would possibly do the trick: http://docs.sqlalchemy.org/en/latest/orm/query.html?highlight=query

>>> local_session.query(sqlalchemy_declarative.SomeTable.text).statement
<sqlalchemy.sql.annotation.AnnotatedSelect at 0x6c75a20; AnnotatedSelectobject>
>>> x=local_session.query(sqlalchemy_declarative.SomeTable.text).statement
>>> print(x)
SELECT sometable.text 
FROM sometable

如何在Flask-SQLAlchemy应用中执行原始SQL

问题:如何在Flask-SQLAlchemy应用中执行原始SQL

如何在SQLAlchemy中执行原始SQL?

我有一个在烧瓶上运行的python Web应用程序,并通过SQLAlchemy连接到数据库。

我需要一种运行原始SQL的方法。该查询涉及多个表联接以及内联视图。

我试过了:

connection = db.session.connection()
connection.execute( <sql here> )

但是我不断收到网关错误。

How do you execute raw SQL in SQLAlchemy?

I have a python web app that runs on flask and interfaces to the database through SQLAlchemy.

I need a way to run the raw SQL. The query involves multiple table joins along with Inline views.

I’ve tried:

connection = db.session.connection()
connection.execute( <sql here> )

But I keep getting gateway errors.


回答 0

你有没有尝试过:

result = db.engine.execute("<sql here>")

要么:

from sqlalchemy import text

sql = text('select name from penguins')
result = db.engine.execute(sql)
names = [row[0] for row in result]
print names

Have you tried:

result = db.engine.execute("<sql here>")

or:

from sqlalchemy import text

sql = text('select name from penguins')
result = db.engine.execute(sql)
names = [row[0] for row in result]
print names

回答 1

SQL Alchemy会话对象具有自己的execute方法:

result = db.session.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})

您的所有应用程序查询都应通过会话对象,无论它们是否是原始SQL。这样可以确保由事务适当地管理查询,该事务允许将同一请求中的多个查询作为一个单元提交或回滚。使用引擎连接进行事务处理将使您面临更大的隐患,即可能很难检测到可能导致数据损坏的错误的风险。每个请求应仅与一个事务相关联,并且使用db.session可以确保您的应用程序是这种情况。

另请注意,它execute是为参数化查询设计的。使用:val示例中的参数作为查询的任何输入,以保护自己免受SQL注入攻击。您可以通过传递a dict作为第二个参数来提供这些参数的值,其中每个键都是在查询中显示的参数名称。参数本身的确切语法可能会有所不同,具体取决于您的数据库,但是所有主要的关系数据库都以某种形式支持它们。

假设这是一个SELECT查询,它将返回一个可迭代RowProxy对象。

您可以使用多种技术访问各个列:

for r in result:
    print(r[0]) # Access by positional index
    print(r['my_column']) # Access by column name as a string
    r_dict = dict(r.items()) # convert to dict keyed by column names

就个人而言,我更喜欢将结果转换为namedtuples:

from collections import namedtuple

Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]
for r in records:
    print(r.my_column)
    print(r)

如果您不使用Flask-SQLAlchemy扩展,您仍然可以轻松使用会话:

import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session

engine = sqlalchemy.create_engine('my connection string')
Session = scoped_session(sessionmaker(bind=engine))

s = Session()
result = s.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})

SQL Alchemy session objects have their own execute method:

result = db.session.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})

All your application queries should be going through a session object, whether they’re raw SQL or not. This ensures that the queries are properly managed by a transaction, which allows multiple queries in the same request to be committed or rolled back as a single unit. Going outside the transaction using the engine or the connection puts you at much greater risk of subtle, possibly hard to detect bugs that can leave you with corrupted data. Each request should be associated with only one transaction, and using db.session will ensure this is the case for your application.

Also take note that execute is designed for parameterized queries. Use parameters, like :val in the example, for any inputs to the query to protect yourself from SQL injection attacks. You can provide the value for these parameters by passing a dict as the second argument, where each key is the name of the parameter as it appears in the query. The exact syntax of the parameter itself may be different depending on your database, but all of the major relational databases support them in some form.

Assuming it’s a SELECT query, this will return an iterable of RowProxy objects.

You can access individual columns with a variety of techniques:

for r in result:
    print(r[0]) # Access by positional index
    print(r['my_column']) # Access by column name as a string
    r_dict = dict(r.items()) # convert to dict keyed by column names

Personally, I prefer to convert the results into namedtuples:

from collections import namedtuple

Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]
for r in records:
    print(r.my_column)
    print(r)

If you’re not using the Flask-SQLAlchemy extension, you can still easily use a session:

import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session

engine = sqlalchemy.create_engine('my connection string')
Session = scoped_session(sessionmaker(bind=engine))

s = Session()
result = s.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})

回答 2

docs:SQL表达式语言教程-使用文本

例:

from sqlalchemy.sql import text

connection = engine.connect()

# recommended
cmd = 'select * from Employees where EmployeeGroup = :group'
employeeGroup = 'Staff'
employees = connection.execute(text(cmd), group = employeeGroup)

# or - wee more difficult to interpret the command
employeeGroup = 'Staff'
employees = connection.execute(
                  text('select * from Employees where EmployeeGroup = :group'), 
                  group = employeeGroup)

# or - notice the requirement to quote 'Staff'
employees = connection.execute(
                  text("select * from Employees where EmployeeGroup = 'Staff'"))


for employee in employees: logger.debug(employee)
# output
(0, 'Tim', 'Gurra', 'Staff', '991-509-9284')
(1, 'Jim', 'Carey', 'Staff', '832-252-1910')
(2, 'Lee', 'Asher', 'Staff', '897-747-1564')
(3, 'Ben', 'Hayes', 'Staff', '584-255-2631')

docs: SQL Expression Language Tutorial – Using Text

example:

from sqlalchemy.sql import text

connection = engine.connect()

# recommended
cmd = 'select * from Employees where EmployeeGroup = :group'
employeeGroup = 'Staff'
employees = connection.execute(text(cmd), group = employeeGroup)

# or - wee more difficult to interpret the command
employeeGroup = 'Staff'
employees = connection.execute(
                  text('select * from Employees where EmployeeGroup = :group'), 
                  group = employeeGroup)

# or - notice the requirement to quote 'Staff'
employees = connection.execute(
                  text("select * from Employees where EmployeeGroup = 'Staff'"))


for employee in employees: logger.debug(employee)
# output
(0, 'Tim', 'Gurra', 'Staff', '991-509-9284')
(1, 'Jim', 'Carey', 'Staff', '832-252-1910')
(2, 'Lee', 'Asher', 'Staff', '897-747-1564')
(3, 'Ben', 'Hayes', 'Staff', '584-255-2631')

回答 3

你可以使用SELECT SQL查询的结果from_statement(),并text()如图所示这里。您不必以这种方式处理元组。作为User具有表名的类的示例,users您可以尝试,

from sqlalchemy.sql import text
.
.
.
user = session.query(User).from_statement(
    text("SELECT * FROM users where name=:name")).\
    params(name='ed').all()

return user

You can get the results of SELECT SQL queries using from_statement() and text() as shown here. You don’t have to deal with tuples this way. As an example for a class User having the table name users you can try,

from sqlalchemy.sql import text

user = session.query(User).from_statement(
    text("""SELECT * FROM users where name=:name""")
).params(name="ed").all()

return user

回答 4

result = db.engine.execute(text("<sql here>"))

<sql here>除非您处于autocommit模式,否则执行但不提交。因此,插入和更新不会反映在数据库中。

要在更改后提交,请执行

result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))
result = db.engine.execute(text("<sql here>"))

executes the <sql here> but doesn’t commit it unless you’re on autocommit mode. So, inserts and updates wouldn’t reflect in the database.

To commit after the changes, do

result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))

回答 5

这是如何从Flask Shell运行SQL查询的简化答案

首先,映射您的模块(如果您的模块/应用程序是principal文件夹中的manage.py,并且您在UNIX操作系统中),请运行:

export FLASK_APP=manage

运行烧瓶壳

flask shell

导入我们需要的::

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
from sqlalchemy import text

运行查询:

result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))

这将使用具有应用程序的当前数据库连接。

This is a simplified answer of how to run SQL query from Flask Shell

First, map your module (if your module/app is manage.py in the principal folder and you are in a UNIX Operating system), run:

export FLASK_APP=manage

Run Flask shell

flask shell

Import what we need::

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
from sqlalchemy import text

Run your query:

result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))

This use the currently database connection which has the application.


回答 6

您是否尝试过按照文档connection.execute(text( <sql here> ), <bind params here> )所述使用和绑定参数?这可以帮助解决许多参数格式设置和性能问题。也许网关错误是超时?绑定参数往往会使复杂的查询执行得更快。

Have you tried using connection.execute(text( <sql here> ), <bind params here> ) and bind parameters as described in the docs? This can help solve many parameter formatting and performance problems. Maybe the gateway error is a timeout? Bind parameters tend to make complex queries execute substantially faster.


回答 7

如果你想避免的元组,另一种方式是通过调用firstoneall方法:

query = db.engine.execute("SELECT * FROM blogs "
                           "WHERE id = 1 ")

assert query.first().name == "Welcome to my blog"

If you want to avoid tuples, another way is by calling the first, one or all methods:

query = db.engine.execute("SELECT * FROM blogs "
                           "WHERE id = 1 ")

assert query.first().name == "Welcome to my blog"

django测试应用程序错误-创建测试数据库时出错:创建数据库的权限被拒绝

问题:django测试应用程序错误-创建测试数据库时出错:创建数据库的权限被拒绝

当我尝试使用命令测试任何应用程序时(当我尝试使用使用此命令的结构来部署myproject时,我注意到了它):

python manage.py test appname

我收到此错误:

Creating test database for alias 'default'...
Got an error creating the test database: permission denied to create database

Type 'yes' if you would like to try deleting the test database 'test_finance', or 'no' to cancel

syncdb命令似乎起作用。我在settings.py中的数据库设置:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql_psycopg2', # Add 'postgresql_psycopg2', 'mysql', 'sqlite3' or 'oracle'.
        'NAME': 'finance',                      # Or path to database file if using sqlite3.
        'USER': 'django',                      # Not used with sqlite3.
        'PASSWORD': 'mydb123',                  # Not used with sqlite3.
        'HOST': '127.0.0.1',                      # Set to empty string for localhost. Not used with sqlite3.
        'PORT': '',                      # Set to empty string for default. Not used with sqlite3.
    }
}

When I try to test any app with command (I noticed it when I tried to deploy myproject using fabric, which uses this command):

python manage.py test appname

I get this error:

Creating test database for alias 'default'...
Got an error creating the test database: permission denied to create database

Type 'yes' if you would like to try deleting the test database 'test_finance', or 'no' to cancel

syncdb command seems to work. My database settings in settings.py:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql_psycopg2', # Add 'postgresql_psycopg2', 'mysql', 'sqlite3' or 'oracle'.
        'NAME': 'finance',                      # Or path to database file if using sqlite3.
        'USER': 'django',                      # Not used with sqlite3.
        'PASSWORD': 'mydb123',                  # Not used with sqlite3.
        'HOST': '127.0.0.1',                      # Set to empty string for localhost. Not used with sqlite3.
        'PORT': '',                      # Set to empty string for default. Not used with sqlite3.
    }
}

回答 0

Django运行测试套件时,在您的情况下,它将创建一个新数据库test_finance。具有用户名的postgres用户django无权创建数据库,因此出现错误消息。

当您运行migrate或时syncdb,Django不会尝试创建finance数据库,因此不会出现任何错误。

您可以通过以超级用户身份在postgres shell中运行以下命令来向django用户添加createdb权限(此堆栈溢出答案的提示)。

=> ALTER USER django CREATEDB;

注意:ALTER USER <username> CREATEDB;命令中使用的用户名需要与Django设置文件中的数据库用户匹配。在这种情况下,原始张贴者将用户作为django上述答案。

When Django runs the test suite, it creates a new database, in your case test_finance. The postgres user with username django does not have permission to create a database, hence the error message.

When you run migrate or syncdb, Django does not try to create the finance database, so you don’t get any errors.

You can add the createdb permission to the django user by running the following command in the postgres shell as a superuser (hat tip to this stack overflow answer).

=> ALTER USER django CREATEDB;

Note: The username used in the ALTER USER <username> CREATEDB; command needs to match the database user in your Django settings files. In this case, the original poster, had the user as django the above answer.


回答 1

我找到了解决您问题的有趣方法。
实际上,对于MySQL,您可以授予不存在的数据库特权。
因此,您可以在设置中为测试数据库添加名称“ test_finance”:

    DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql_psycopg2', # Add 'postgresql_psycopg2', 'mysql', 'sqlite3' or 'oracle'.
        'NAME': 'finance',                      # Or path to database file if using sqlite3.
        'USER': 'django',                      # Not used with sqlite3.
        'PASSWORD': 'mydb123',                  # Not used with sqlite3.
        'HOST': '127.0.0.1',                      # Set to empty string for localhost. Not used with sqlite3.
        'PORT': '',                      # Set to empty string for default. Not used with sqlite3.
        'TEST': {
            'NAME': 'test_finance',
        },
    }
}

以root用户身份启动MySQL Shell:

mysql -u root -p

现在将所有特权授予该MySQL中不存在的数据库:

GRANT ALL PRIVILEGES ON test_finance.* TO 'django'@'localhost';

现在,Django将毫无问题地开始测试。

I have found interesting solution to your problem.
In fact for MySQL you can grant privileges for non-existing database.
So you can add name ‘test_finance’ for your test database in your settings:

    DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql_psycopg2', # Add 'postgresql_psycopg2', 'mysql', 'sqlite3' or 'oracle'.
        'NAME': 'finance',                      # Or path to database file if using sqlite3.
        'USER': 'django',                      # Not used with sqlite3.
        'PASSWORD': 'mydb123',                  # Not used with sqlite3.
        'HOST': '127.0.0.1',                      # Set to empty string for localhost. Not used with sqlite3.
        'PORT': '',                      # Set to empty string for default. Not used with sqlite3.
        'TEST': {
            'NAME': 'test_finance',
        },
    }
}

start MySQL shell as the root user:

mysql -u root -p

and now grant all privileges to this non-existing database in MySQL:

GRANT ALL PRIVILEGES ON test_finance.* TO 'django'@'localhost';

Now Django will start tests without any problems.


回答 2

对于Postgres,用户必须具有createdb权限。

ALTER ROLE miriam CREATEDB;

请参阅此文档:https : //docs.djangoproject.com/zh-CN/2.0/topics/testing/overview/#the-test-database

In the case of Postgres, the user must have createdb permission.

ALTER ROLE miriam CREATEDB;

See this documentation: https://docs.djangoproject.com/en/2.0/topics/testing/overview/#the-test-database


回答 3

如果数据库是mysql,那么这两个更改将完成任务。

1.打开mysite / mysite / settings.py

您的数据库设置应具有一个额外的TEST块,如projectname_test所示。

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'myproject',
        'USER': 'chandan',
        'PASSWORD': 'root',
        'HOST': 'localhost',
        'PORT': '3306',
        'TEST': {
            'NAME': 'myproject_test',
        },
    }
}

2.使用mysql命令提示符mysql工作台键入以下命令,将所有特权授予settings.py中指定的用户

GRANT ALL PRIVILEGES ON myproject_test.* TO 'chandan'@'localhost';

现在您可以运行 python manage.py test polls

If database is mysql then these two changes will get the things done.

1.Open mysite/mysite/settings.py

Your database settings should have an additional TEST block as shown with projectname_test.

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'myproject',
        'USER': 'chandan',
        'PASSWORD': 'root',
        'HOST': 'localhost',
        'PORT': '3306',
        'TEST': {
            'NAME': 'myproject_test',
        },
    }
}

2.Type the below command using mysql command prompt or mysql workbench to give all privilages to the user specified in settings.py

GRANT ALL PRIVILEGES ON myproject_test.* TO 'chandan'@'localhost';

Now you can run python manage.py test polls.


回答 4

如果您使用的docker-compose是对我有用的,则如下:

ALTER ROLE username CREATEDB;
GRANT ALL PRIVILEGES ON test_database_name.* TO 'username';

要么

ALTER ROLE username CREATEDB;
GRANT ALL PRIVILEGES ON *.* TO 'username'@'%';

我的设置如下所示:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'database_name',
        'USER': 'username',
        'PASSWORD': 'password',
        'HOST': 'db',
        'PORT': '3306',
    }
}

和我的docker-compose.yml样子如下:

version: '3'
services:
  web:
      build: .
      command: './wait_for_db_and_start_server.sh'
      env_file: env_web
      working_dir: /project_name
      links:
        - db
      volumes:
        - .:/volume_name
      ports:
        - "8000:8000"
      depends_on:
        - db
  db:
    image: mysql:5.7
    restart: always
    env_file: env_db
    working_dir: /db
    volumes:
      - ./Dump.sql:/db/Dump.sql
    ports:
      - "3306:3306"

If you are using docker-compose what worked for me was the following:

ALTER ROLE username CREATEDB;
GRANT ALL PRIVILEGES ON test_database_name.* TO 'username';

or

ALTER ROLE username CREATEDB;
GRANT ALL PRIVILEGES ON *.* TO 'username'@'%';

My settings looks like this:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'database_name',
        'USER': 'username',
        'PASSWORD': 'password',
        'HOST': 'db',
        'PORT': '3306',
    }
}

and my docker-compose.yml looks as follows:

version: '3'
services:
  web:
      build: .
      command: './wait_for_db_and_start_server.sh'
      env_file: env_web
      working_dir: /project_name
      links:
        - db
      volumes:
        - .:/volume_name
      ports:
        - "8000:8000"
      depends_on:
        - db
  db:
    image: mysql:5.7
    restart: always
    env_file: env_db
    working_dir: /db
    volumes:
      - ./Dump.sql:/db/Dump.sql
    ports:
      - "3306:3306"

回答 5

就我而言,GRANT PRIVILEGES解决方案不适用于Python 3.7.2Django 2.1.7MySQL 5.6.23 …我不知道为什么。

所以我决定使用SQLite作为TEST数据库…

DATABASES = {
    'default': {
        'NAME': 'productiondb',
        'ENGINE': 'mysql.connector.django',   # 'django.db.backends.mysql'
        'USER': '<user>',
        'PASSWORD': '<pass>',
        'HOST': 'localhost',
        'PORT': 3306,
        'OPTIONS': {
            'autocommit': True,
        },
        'TEST': {
            'ENGINE': 'django.db.backends.sqlite3',
            'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
        },
    }
}

此后,TESTS的汽车运行就没有麻烦了:

$ python manage.py test
Creating test database for alias 'default'...
System check identified no issues (0 silenced).

Destroying test database for alias 'default'...
----------------------------------------------------------------------
Ran 0 tests in 0.000s

OK

Process finished with exit code 0

In my case, GRANT PRIVILEGES solutions didn’t work with Python 3.7.2, Django 2.1.7 and MySQL 5.6.23… I don’t know why.

So I decided to use SQLite as a TEST database…

DATABASES = {
    'default': {
        'NAME': 'productiondb',
        'ENGINE': 'mysql.connector.django',   # 'django.db.backends.mysql'
        'USER': '<user>',
        'PASSWORD': '<pass>',
        'HOST': 'localhost',
        'PORT': 3306,
        'OPTIONS': {
            'autocommit': True,
        },
        'TEST': {
            'ENGINE': 'django.db.backends.sqlite3',
            'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
        },
    }
}

After that, TESTS car run without troubles:

$ python manage.py test
Creating test database for alias 'default'...
System check identified no issues (0 silenced).

Destroying test database for alias 'default'...
----------------------------------------------------------------------
Ran 0 tests in 0.000s

OK

Process finished with exit code 0

回答 6

哇,所以结合这里的所有答案和一点点调整,终于使我得到了一个适用于docker-compose,django和postgres的有效解决方案…

首先,noufal valapra给出的postgres命令不正确(或者可能不是最新的),它应该是:

ALTER USER docker WITH CREATEDB;

如果是docker-compose设置,它将进入init.sql文件,这就是我的样子:

CREATE USER docker;
ALTER USER docker WITH CREATEDB;
CREATE DATABASE djangodb;
GRANT ALL PRIVILEGES ON DATABASE djangodb TO docker;

然后用于postgres的Dockerfile如下所示:

FROM postgres:10.1-alpine
COPY init.sql /docker-entrypoint-initdb.d/

然后Django settings.py具有以下条目:

if 'RDS_DB_NAME' in os.environ:
    INTERNAL_DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.postgresql_psycopg2',
            'NAME': os.environ['RDS_DB_NAME'],
            'USER': os.environ['RDS_USERNAME'],
            'PASSWORD': os.environ['RDS_PASSWORD'],
            'HOST': os.environ['RDS_HOSTNAME'],
            'PORT': os.environ['RDS_PORT'],
        }
    }

和docker-compose看起来像这样:

版本:“ 3.6”

服务:

postgresdb:
  build:
    context: ./
    dockerfile: ./Dockerfile-postgresdb
  volumes:
    - postgresdata:/var/lib/postgresql/data/

django:
  build:
    context: ../
    dockerfile: ./docker/Dockerfile
  environment:
    - RDS_DB_NAME=djangodb
    - RDS_USERNAME=docker
    - RDS_PASSWORD=docker
    - RDS_HOSTNAME=postgresdb
    - RDS_PORT=5432

  stdin_open: true
  tty: true
  depends_on:
    - postgresdb

volumes:
    postgresdata:

Wow so combining all of the answers here with a little tweaking finally got me to a working solution for docker-compose, django, and postgres…

First the postgres command given by noufal valapra is not correct (or maybe just not current), it should be:

ALTER USER docker WITH CREATEDB;

In the case of a docker-compose setup, this will go in the init.sql file, this is what mine looks like:

CREATE USER docker;
ALTER USER docker WITH CREATEDB;
CREATE DATABASE djangodb;
GRANT ALL PRIVILEGES ON DATABASE djangodb TO docker;

Then the Dockerfile for postgres looks like this:

FROM postgres:10.1-alpine
COPY init.sql /docker-entrypoint-initdb.d/

Then the Django settings.py has this entry:

if 'RDS_DB_NAME' in os.environ:
    INTERNAL_DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.postgresql_psycopg2',
            'NAME': os.environ['RDS_DB_NAME'],
            'USER': os.environ['RDS_USERNAME'],
            'PASSWORD': os.environ['RDS_PASSWORD'],
            'HOST': os.environ['RDS_HOSTNAME'],
            'PORT': os.environ['RDS_PORT'],
        }
    }

and the docker-compose looks like this:

version: ‘3.6’

services:

postgresdb:
  build:
    context: ./
    dockerfile: ./Dockerfile-postgresdb
  volumes:
    - postgresdata:/var/lib/postgresql/data/

django:
  build:
    context: ../
    dockerfile: ./docker/Dockerfile
  environment:
    - RDS_DB_NAME=djangodb
    - RDS_USERNAME=docker
    - RDS_PASSWORD=docker
    - RDS_HOSTNAME=postgresdb
    - RDS_PORT=5432

  stdin_open: true
  tty: true
  depends_on:
    - postgresdb

volumes:
    postgresdata:

回答 7

也许您将测试置于暂停模式或作为后台工作。尝试fg在bash shell中使用命令。

Maybe you put your test in suspended mode or as a backgrounded job. Try with fg command in bash shell.


回答 8

超级用户帐户是保证测试顺利的最简单方法。因此,使django用户su 更简单的方法就是这样做ALTER django WITH SUPERUSER

有关更多信息https://www.postgresql.org/docs/current/sql-alteruser.html

A superuser account is the easiest way to guarantee smooth testing. so a simpler way of making the django user su is to do ALTER django WITH SUPERUSER .

for more information https://www.postgresql.org/docs/current/sql-alteruser.html


Records-面向人类的sql

Records:用于人类的sql™

Records是一个非常简单但功能强大的库,用于对大多数关系数据库进行原始SQL查询

只需编写SQL即可。没有铃声,没有口哨。使用可用的标准工具,这项常见任务可能会出人意料地困难。该库努力使此工作流尽可能简单,同时提供一个优雅的界面来处理您的查询结果

数据库支持包括RedShift、Postgres、MySQL、SQLite、Oracle和MS-SQL(不包括驱动程序)


☤基础知识

我们知道如何编写SQL,所以让我们将一些内容发送到我们的数据库:

import records

db = records.Database('postgres://...')
rows = db.query('select * from active_users')    # or db.query_file('sqls/active-users.sql')

一次抓取一行:

>>> rows[0]
<Record {"username": "model-t", "active": true, "name": "Henry Ford", "user_email": "model-t@gmail.com", "timezone": "2016-02-06 22:28:23.894202"}>

或者迭代它们:

for r in rows:
    print(r.name, r.user_email)

可以通过多种方式访问值:row.user_emailrow['user_email'],或row[3]

还完全支持包含非字母数字字符(如空格)的字段

或存储您的记录集合的副本以供以后参考:

>>> rows.all()
[<Record {"username": ...}>, <Record {"username": ...}>, <Record {"username": ...}>, ...]

如果您只期待一个结果:

>>> rows.first()
<Record {"username": ...}>

其他选项包括rows.as_dict()rows.as_dict(ordered=True)

☤功能

  • 迭代行被缓存以供将来引用
  • $DATABASE_URL环境变量支持
  • 便利性Database.get_table_names方法
  • 用于导出查询的命令行记录工具
  • 安全参数化:Database.query('life=:everything', everything=42)
  • 查询可以作为字符串或文件名传递,支持的参数
  • 交易记录:t = Database.transaction(); t.commit()
  • 批量操作:Database.bulk_query()&Database.bulk_query_file()

唱片公司自豪地由SQLAlchemyTablib

☤数据导出功能

Record还具有完全的Tablib集成功能,允许您通过一行代码将结果导出为CSV、XLS、JSON、HTML表、YAML或Pandas DataFrames。非常适合与朋友共享数据或生成报告

>>> print(rows.dataset)
username|active|name      |user_email       |timezone
--------|------|----------|-----------------|--------------------------
model-t |True  |Henry Ford|model-t@gmail.com|2016-02-06 22:28:23.894202
...

逗号分隔值(CSV)

>>> print(rows.export('csv'))
username,active,name,user_email,timezone
model-t,True,Henry Ford,model-t@gmail.com,2016-02-06 22:28:23.894202
...

YAML Ain‘t Markup Language(YAML)

>>> print(rows.export('yaml'))
- {active: true, name: Henry Ford, timezone: '2016-02-06 22:28:23.894202', user_email: model-t@gmail.com, username: model-t}
...

JavaScript对象表示法(JSON)

>>> print(rows.export('json'))
[{"username": "model-t", "active": true, "name": "Henry Ford", "user_email": "model-t@gmail.com", "timezone": "2016-02-06 22:28:23.894202"}, ...]

Microsoft Excel(xls,xlsx)

with open('report.xls', 'wb') as f:
    f.write(rows.export('xls'))

熊猫数据帧

>>> rows.export('df')
    username  active       name        user_email                   timezone
0    model-t    True Henry Ford model-t@gmail.com 2016-02-06 22:28:23.894202

你说对了。Tablib的所有其他功能也可用,因此您可以对结果进行排序、添加/删除列/行、删除重复项、转置表格、添加分隔符、按列切片数据等

请参阅Tablib Documentation有关更多详细信息,请参阅

☤安装

当然,推荐的安装方法是pipenv

$ pipenv install records[pandas]
✨🍰✨

☤命令行工具

作为额外的奖励,一个records自动包括命令行工具。以下是使用信息的截图:

☤,谢谢你

感谢您借阅本图书馆!我希望你会觉得它有用

当然,总有改进的空间。请随意……open an issue所以我们可以把唱片做得更好、更强、更快

如何在Django queryset中执行OR条件?

问题:如何在Django queryset中执行OR条件?

我想编写一个与此SQL查询等效的Django查询:

SELECT * from user where income >= 5000 or income is NULL.

如何构造Django queryset过滤器?

User.objects.filter(income__gte=5000, income=0)

这是行不通的,因为它AND是过滤器。我想要OR过滤器以获取单个查询集的并集。

I want to write a Django query equivalent to this SQL query:

SELECT * from user where income >= 5000 or income is NULL.

How to construct the Django queryset filter?

User.objects.filter(income__gte=5000, income=0)

This doesn’t work, because it ANDs the filters. I want to OR the filters to get union of individual querysets.


回答 0

from django.db.models import Q
User.objects.filter(Q(income__gte=5000) | Q(income__isnull=True))

通过文档

from django.db.models import Q
User.objects.filter(Q(income__gte=5000) | Q(income__isnull=True))

via Documentation


回答 1

由于QuerySet实现了Python __or__运算符(|)或并集,因此它可以正常工作。正如您所期望的,|二进制运算符返回一个QuerySetso order_by(),,.distinct()其他查询集过滤器可以附加到末尾。

combined_queryset = User.objects.filter(income__gte=5000) | User.objects.filter(income__isnull=True)
ordered_queryset = combined_queryset.order_by('-income')

更新2019-06-20:现在在Django 2.1 QuerySet API参考中已完全记录了该内容。更多历史性讨论可以在DjangoProject票证#21333中找到

Because QuerySets implement the Python __or__ operator (|), or union, it just works. As you’d expect, the | binary operator returns a QuerySet so order_by(), .distinct(), and other queryset filters can be tacked on to the end.

combined_queryset = User.objects.filter(income__gte=5000) | User.objects.filter(income__isnull=True)
ordered_queryset = combined_queryset.order_by('-income')

Update 2019-06-20: This is now fully documented in the Django 2.1 QuerySet API reference. More historic discussion can be found in DjangoProject ticket #21333.


回答 2

现有答案中已经提到了这两种选择:

from django.db.models import Q
q1 = User.objects.filter(Q(income__gte=5000) | Q(income__isnull=True))

q2 = User.objects.filter(income__gte=5000) | User.objects.filter(income__isnull=True)

但是,似乎偏爱哪一个。

关键是它们在SQL级别上是相同的,所以请随意选择您喜欢的任何一个!

Django的ORM食谱在这个比较详细谈到,这里是有关部分:


queryset = User.objects.filter(
        first_name__startswith='R'
    ) | User.objects.filter(
    last_name__startswith='D'
)

导致

In [5]: str(queryset.query)
Out[5]: 'SELECT "auth_user"."id", "auth_user"."password", "auth_user"."last_login",
"auth_user"."is_superuser", "auth_user"."username", "auth_user"."first_name",
"auth_user"."last_name", "auth_user"."email", "auth_user"."is_staff",
"auth_user"."is_active", "auth_user"."date_joined" FROM "auth_user"
WHERE ("auth_user"."first_name"::text LIKE R% OR "auth_user"."last_name"::text LIKE D%)'

qs = User.objects.filter(Q(first_name__startswith='R') | Q(last_name__startswith='D'))

导致

In [9]: str(qs.query)
Out[9]: 'SELECT "auth_user"."id", "auth_user"."password", "auth_user"."last_login",
 "auth_user"."is_superuser", "auth_user"."username", "auth_user"."first_name",
  "auth_user"."last_name", "auth_user"."email", "auth_user"."is_staff",
  "auth_user"."is_active", "auth_user"."date_joined" FROM "auth_user"
  WHERE ("auth_user"."first_name"::text LIKE R% OR "auth_user"."last_name"::text LIKE D%)'

资料来源:django-orm-cookbook


Both options are already mentioned in the existing answers:

from django.db.models import Q
q1 = User.objects.filter(Q(income__gte=5000) | Q(income__isnull=True))

and

q2 = User.objects.filter(income__gte=5000) | User.objects.filter(income__isnull=True)

However, there seems to be some confusion regarding which one is to prefer.

The point is that they are identical on the SQL level, so feel free to pick whichever you like!

The Django ORM Cookbook talks in some detail about this, here is the relevant part:


queryset = User.objects.filter(
        first_name__startswith='R'
    ) | User.objects.filter(
    last_name__startswith='D'
)

leads to

In [5]: str(queryset.query)
Out[5]: 'SELECT "auth_user"."id", "auth_user"."password", "auth_user"."last_login",
"auth_user"."is_superuser", "auth_user"."username", "auth_user"."first_name",
"auth_user"."last_name", "auth_user"."email", "auth_user"."is_staff",
"auth_user"."is_active", "auth_user"."date_joined" FROM "auth_user"
WHERE ("auth_user"."first_name"::text LIKE R% OR "auth_user"."last_name"::text LIKE D%)'

and

qs = User.objects.filter(Q(first_name__startswith='R') | Q(last_name__startswith='D'))

leads to

In [9]: str(qs.query)
Out[9]: 'SELECT "auth_user"."id", "auth_user"."password", "auth_user"."last_login",
 "auth_user"."is_superuser", "auth_user"."username", "auth_user"."first_name",
  "auth_user"."last_name", "auth_user"."email", "auth_user"."is_staff",
  "auth_user"."is_active", "auth_user"."date_joined" FROM "auth_user"
  WHERE ("auth_user"."first_name"::text LIKE R% OR "auth_user"."last_name"::text LIKE D%)'

source: django-orm-cookbook