如何将DataFrame写入postgres表?

问题:如何将DataFrame写入postgres表?

DataFrame.to_sql方法,但仅适用于mysql,sqlite和oracle数据库。我无法传递给此方法postgres连接或sqlalchemy引擎。

There is DataFrame.to_sql method, but it works only for mysql, sqlite and oracle databases. I cant pass to this method postgres connection or sqlalchemy engine.


回答 0

从pandas 0.14(2014年5月发布)开始,支持postgresql。该sql模块现在用于sqlalchemy支持不同的数据库风格。您可以为PostgreSQL数据库传递sqlalchemy引擎(请参阅docs)。例如:

from sqlalchemy import create_engine
engine = create_engine('postgresql://scott:tiger@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

您是正确的,在不支持0.13.1版本的熊猫中,不支持postgresql。如果您需要使用旧版本的熊猫,请使用以下修补版本pandas.io.sqlhttps : //gist.github.com/jorisvandenbossche/10841234
我是在前一段时间写的,所以不能完全保证它始终有效,但是基础应该在那里)。如果将该文件放在工作目录中并导入,那么您应该能够执行此操作(conPostgreSQL连接在哪里):

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

Starting from pandas 0.14 (released end of May 2014), postgresql is supported. The sql module now uses sqlalchemy to support different database flavors. You can pass a sqlalchemy engine for a postgresql database (see docs). E.g.:

from sqlalchemy import create_engine
engine = create_engine('postgresql://scott:tiger@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

You are correct that in pandas up to version 0.13.1 postgresql was not supported. If you need to use an older version of pandas, here is a patched version of pandas.io.sql: https://gist.github.com/jorisvandenbossche/10841234.
I wrote this a time ago, so cannot fully guarantee that it always works, buth the basis should be there). If you put that file in your working directory and import it, then you should be able to do (where con is a postgresql connection):

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

回答 1

更快的选择:

以下代码比df.to_sql方法将您的Pandas DF复制到postgres DB的速度要快得多,并且您不需要任何中间的csv文件来存储df。

根据数据库规范创建引擎。

在您的postgres DB中创建一个表,该表的列数与Dataframe(df)相同。

DF中的数据将插入到您的postgres表中。

from sqlalchemy import create_engine
import psycopg2 
import io

如果要替换表,可以使用df中的标头将其替换为普通的to_sql方法,然后将整个耗时的df加载到DB中。

engine = create_engine('postgresql+psycopg2://username:password@host:port/database')

df.head(0).to_sql('table_name', engine, if_exists='replace',index=False) #truncates the table

conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()

Faster option:

The following code will copy your Pandas DF to postgres DB much faster than df.to_sql method and you won’t need any intermediate csv file to store the df.

Create an engine based on your DB specifications.

Create a table in your postgres DB that has equal number of columns as the Dataframe (df).

Data in DF will get inserted in your postgres table.

from sqlalchemy import create_engine
import psycopg2 
import io

if you want to replace the table, we can replace it with normal to_sql method using headers from our df and then load the entire big time consuming df into DB.

engine = create_engine('postgresql+psycopg2://username:password@host:port/database')

df.head(0).to_sql('table_name', engine, if_exists='replace',index=False) #truncates the table

conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()

回答 2

熊猫0.24.0+解决方案

在Pandas 0.24.0中引入了一个新功能,该功能是专为快速写入Postgres设计的。您可以在此处了解更多信息:https : //pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

import csv
from io import StringIO

from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)

Pandas 0.24.0+ solution

In Pandas 0.24.0 a new feature was introduced specifically designed for fast writes to Postgres. You can learn more about it here: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

import csv
from io import StringIO

from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)

回答 3

这就是我做的。

可能更快,因为它正在使用execute_batch

# df is the dataframe
if len(df) > 0:
    df_columns = list(df)
    # create (col1,col2,...)
    columns = ",".join(df_columns)

    # create VALUES('%s', '%s",...) one '%s' per column
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 

    #create INSERT INTO table (columns) VALUES('%s',...)
    insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)

    cur = conn.cursor()
    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
    conn.commit()
    cur.close()

This is how I did it.

It may be faster because it is using execute_batch:

# df is the dataframe
if len(df) > 0:
    df_columns = list(df)
    # create (col1,col2,...)
    columns = ",".join(df_columns)

    # create VALUES('%s', '%s",...) one '%s' per column
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 

    #create INSERT INTO table (columns) VALUES('%s',...)
    insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)

    cur = conn.cursor()
    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
    conn.commit()
    cur.close()

回答 4

对于Python 2.7和Pandas 0.24.2并使用Psycopg2

Psycopg2连接模块

def dbConnect (db_parm, username_parm, host_parm, pw_parm):
    # Parse in connection information
    credentials = {'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm}
    conn = psycopg2.connect(**credentials)
    conn.autocommit = True  # auto-commit each entry to the database
    conn.cursor_factory = RealDictCursor
    cur = conn.cursor()
    print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
    return conn, cur

连接到数据库

conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)

假设数据帧已经作为df存在

output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='\t', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '\t' NULL ''  ESCAPE '\\' HEADER "  # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()

For Python 2.7 and Pandas 0.24.2 and using Psycopg2

Psycopg2 Connection Module

def dbConnect (db_parm, username_parm, host_parm, pw_parm):
    # Parse in connection information
    credentials = {'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm}
    conn = psycopg2.connect(**credentials)
    conn.autocommit = True  # auto-commit each entry to the database
    conn.cursor_factory = RealDictCursor
    cur = conn.cursor()
    print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
    return conn, cur

Connect to the database

conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)

Assuming dataframe to be present already as df

output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='\t', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '\t' NULL ''  ESCAPE '\\' HEADER "  # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()