问题:使用Python将CSV文件导入sqlite3数据库表
我有一个CSV文件,我想使用Python将该文件批量导入到我的sqlite3数据库中。该命令是“ .import …..”。但似乎不能这样工作。谁能给我一个在sqlite3中如何做的例子?我正在使用Windows,以防万一。谢谢
I have a CSV file and I want to bulk-import this file into my sqlite3 database using Python. the command is “.import …..”. but it seems that it cannot work like this. Can anyone give me an example of how to do it in sqlite3? I am using windows just in case.
Thanks
回答 0
import csv, sqlite3
con = sqlite3.connect(":memory:") # change to 'sqlite:///your_filename.db'
cur = con.cursor()
cur.execute("CREATE TABLE t (col1, col2);") # use your column names here
with open('data.csv','r') as fin: # `with` statement available in 2.5+
# csv.DictReader uses first line in file for column headings by default
dr = csv.DictReader(fin) # comma is default delimiter
to_db = [(i['col1'], i['col2']) for i in dr]
cur.executemany("INSERT INTO t (col1, col2) VALUES (?, ?);", to_db)
con.commit()
con.close()
import csv, sqlite3
con = sqlite3.connect(":memory:") # change to 'sqlite:///your_filename.db'
cur = con.cursor()
cur.execute("CREATE TABLE t (col1, col2);") # use your column names here
with open('data.csv','r') as fin: # `with` statement available in 2.5+
# csv.DictReader uses first line in file for column headings by default
dr = csv.DictReader(fin) # comma is default delimiter
to_db = [(i['col1'], i['col2']) for i in dr]
cur.executemany("INSERT INTO t (col1, col2) VALUES (?, ?);", to_db)
con.commit()
con.close()
回答 1
留给读者一个创建与磁盘上文件的sqlite连接的练习…但是熊猫库现在提供了两种方法
df = pandas.read_csv(csvfile)
df.to_sql(table_name, conn, if_exists='append', index=False)
Creating an sqlite connection to a file on disk is left as an exercise for the reader … but there is now a two-liner made possible by the pandas library
df = pandas.read_csv(csvfile)
df.to_sql(table_name, conn, if_exists='append', index=False)
回答 2
我的2美分(更通用):
import csv, sqlite3
import logging
def _get_col_datatypes(fin):
dr = csv.DictReader(fin) # comma is default delimiter
fieldTypes = {}
for entry in dr:
feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
if not feildslLeft: break # We're done
for field in feildslLeft:
data = entry[field]
# Need data to decide
if len(data) == 0:
continue
if data.isdigit():
fieldTypes[field] = "INTEGER"
else:
fieldTypes[field] = "TEXT"
# TODO: Currently there's no support for DATE in sqllite
if len(feildslLeft) > 0:
raise Exception("Failed to find all the columns data types - Maybe some are empty?")
return fieldTypes
def escapingGenerator(f):
for line in f:
yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")
def csvToDb(csvFile, outputToFile = False):
# TODO: implement output to file
with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
dt = _get_col_datatypes(fin)
fin.seek(0)
reader = csv.DictReader(fin)
# Keep the order of the columns name just as in the CSV
fields = reader.fieldnames
cols = []
# Set field and type
for f in fields:
cols.append("%s %s" % (f, dt[f]))
# Generate create table statement:
stmt = "CREATE TABLE ads (%s)" % ",".join(cols)
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute(stmt)
fin.seek(0)
reader = csv.reader(escapingGenerator(fin))
# Generate insert statement:
stmt = "INSERT INTO ads VALUES(%s);" % ','.join('?' * len(cols))
cur.executemany(stmt, reader)
con.commit()
return con
My 2 cents (more generic):
import csv, sqlite3
import logging
def _get_col_datatypes(fin):
dr = csv.DictReader(fin) # comma is default delimiter
fieldTypes = {}
for entry in dr:
feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
if not feildslLeft: break # We're done
for field in feildslLeft:
data = entry[field]
# Need data to decide
if len(data) == 0:
continue
if data.isdigit():
fieldTypes[field] = "INTEGER"
else:
fieldTypes[field] = "TEXT"
# TODO: Currently there's no support for DATE in sqllite
if len(feildslLeft) > 0:
raise Exception("Failed to find all the columns data types - Maybe some are empty?")
return fieldTypes
def escapingGenerator(f):
for line in f:
yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")
def csvToDb(csvFile, outputToFile = False):
# TODO: implement output to file
with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
dt = _get_col_datatypes(fin)
fin.seek(0)
reader = csv.DictReader(fin)
# Keep the order of the columns name just as in the CSV
fields = reader.fieldnames
cols = []
# Set field and type
for f in fields:
cols.append("%s %s" % (f, dt[f]))
# Generate create table statement:
stmt = "CREATE TABLE ads (%s)" % ",".join(cols)
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute(stmt)
fin.seek(0)
reader = csv.reader(escapingGenerator(fin))
# Generate insert statement:
stmt = "INSERT INTO ads VALUES(%s);" % ','.join('?' * len(cols))
cur.executemany(stmt, reader)
con.commit()
return con
回答 3
该.import
命令是sqlite3命令行工具的功能。要在Python中完成此操作,您应该简单地使用Python具备的任何功能(例如csv模块)加载数据,然后照常插入数据。
这样,您还可以控制要插入的类型,而不必依赖sqlite3似乎没有记录的行为。
The .import
command is a feature of the sqlite3 command-line tool. To do it in Python, you should simply load the data using whatever facilities Python has, such as the csv module, and inserting the data as per usual.
This way, you also have control over what types are inserted, rather than relying on sqlite3’s seemingly undocumented behaviour.
回答 4
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys, csv, sqlite3
def main():
con = sqlite3.connect(sys.argv[1]) # database file input
cur = con.cursor()
cur.executescript("""
DROP TABLE IF EXISTS t;
CREATE TABLE t (COL1 TEXT, COL2 TEXT);
""") # checks to see if table exists and makes a fresh table.
with open(sys.argv[2], "rb") as f: # CSV file input
reader = csv.reader(f, delimiter=',') # no header information with delimiter
for row in reader:
to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8")] # Appends data from CSV file representing and handling of text
cur.execute("INSERT INTO neto (COL1, COL2) VALUES(?, ?);", to_db)
con.commit()
con.close() # closes connection to database
if __name__=='__main__':
main()
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys, csv, sqlite3
def main():
con = sqlite3.connect(sys.argv[1]) # database file input
cur = con.cursor()
cur.executescript("""
DROP TABLE IF EXISTS t;
CREATE TABLE t (COL1 TEXT, COL2 TEXT);
""") # checks to see if table exists and makes a fresh table.
with open(sys.argv[2], "rb") as f: # CSV file input
reader = csv.reader(f, delimiter=',') # no header information with delimiter
for row in reader:
to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8")] # Appends data from CSV file representing and handling of text
cur.execute("INSERT INTO neto (COL1, COL2) VALUES(?, ?);", to_db)
con.commit()
con.close() # closes connection to database
if __name__=='__main__':
main()
回答 5
非常感谢bernie的回答!必须进行一些调整-这对我有用:
import csv, sqlite3
conn = sqlite3.connect("pcfc.sl3")
curs = conn.cursor()
curs.execute("CREATE TABLE PCFC (id INTEGER PRIMARY KEY, type INTEGER, term TEXT, definition TEXT);")
reader = csv.reader(open('PC.txt', 'r'), delimiter='|')
for row in reader:
to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8"), unicode(row[2], "utf8")]
curs.execute("INSERT INTO PCFC (type, term, definition) VALUES (?, ?, ?);", to_db)
conn.commit()
我的文本文件(PC.txt)如下所示:
1 | Term 1 | Definition 1
2 | Term 2 | Definition 2
3 | Term 3 | Definition 3
Many thanks for bernie’s answer! Had to tweak it a bit – here’s what worked for me:
import csv, sqlite3
conn = sqlite3.connect("pcfc.sl3")
curs = conn.cursor()
curs.execute("CREATE TABLE PCFC (id INTEGER PRIMARY KEY, type INTEGER, term TEXT, definition TEXT);")
reader = csv.reader(open('PC.txt', 'r'), delimiter='|')
for row in reader:
to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8"), unicode(row[2], "utf8")]
curs.execute("INSERT INTO PCFC (type, term, definition) VALUES (?, ?, ?);", to_db)
conn.commit()
My text file (PC.txt) looks like this:
1 | Term 1 | Definition 1
2 | Term 2 | Definition 2
3 | Term 3 | Definition 3
回答 6
没错,这.import
是正确的方法,但这是来自SQLite3.exe Shell的命令。这个问题的很多顶级答案都涉及本机python循环,但如果文件很大(我的记录是10 ^ 6到10 ^ 7记录),则要避免将所有内容读入熊猫或使用本机python列表理解/循环(尽管我没有时间比较它们)。
对于大文件,我认为最好的选择是使用预先创建空表,sqlite3.execute("CREATE TABLE...")
从CSV文件中删除标题,然后用于subprocess.run()
执行sqlite的import语句。由于我相信最后一部分是最相关的,所以我将从这一点开始。
subprocess.run()
from pathlib import Path
db_name = Path('my.db').resolve()
csv_file = Path('file.csv').resolve()
result = subprocess.run(['sqlite3',
str(db_name),
'-cmd',
'.mode csv',
'.import '+str(csv_file).replace('\\','\\\\')
+' <table_name>'],
capture_output=True)
解释
在命令行中,您要查找的命令是sqlite3 my.db -cmd ".mode csv" ".import file.csv table"
。 subprocess.run()
运行命令行过程。to的参数subprocess.run()
是一串字符串,这些字符串被解释为命令,后跟所有参数。
sqlite3 my.db
打开数据库
-cmd
数据库允许您将多个跟进命令传递给sqlite程序后添加标志。在外壳中,每个命令都必须用引号引起来,但是在这里,它们只需要成为序列中自己的元素即可
'.mode csv'
符合您的期望
'.import '+str(csv_file).replace('\\','\\\\')+' <table_name>'
是导入命令。
不幸的是,由于子进程将所有后续操作都传递给带-cmd
引号的字符串,因此如果您有Windows目录路径,则需要将反斜杠加倍。
剥离标题
这并不是问题的重点,但这是我所使用的。同样,我不想在任何时候将整个文件读入内存:
with open(csv, "r") as source:
source.readline()
with open(str(csv)+"_nohead", "w") as target:
shutil.copyfileobj(source, target)
You’re right that .import
is the way to go, but that’s a command from the SQLite3.exe shell. A lot of the top answers to this question involve native python loops, but if your files are large (mine are 10^6 to 10^7 records), you want to avoid reading everything into pandas or using a native python list comprehension/loop (though I did not time them for comparison).
For large files, I believe the best option is to create the empty table in advance using sqlite3.execute("CREATE TABLE...")
, strip the headers from your CSV files, and then use subprocess.run()
to execute sqlite’s import statement. Since the last part is I believe the most pertinent, I will start with that.
subprocess.run()
from pathlib import Path
db_name = Path('my.db').resolve()
csv_file = Path('file.csv').resolve()
result = subprocess.run(['sqlite3',
str(db_name),
'-cmd',
'.mode csv',
'.import '+str(csv_file).replace('\\','\\\\')
+' <table_name>'],
capture_output=True)
Explanation
From the command line, the command you’re looking for is sqlite3 my.db -cmd ".mode csv" ".import file.csv table"
. subprocess.run()
runs a command line process. The argument to subprocess.run()
is a sequence of strings which are interpreted as a command followed by all of it’s arguments.
sqlite3 my.db
opens the database
-cmd
flag after the database allows you to pass multiple follow on commands to the sqlite program. In the shell, each command has to be in quotes, but here, they just need to be their own element of the sequence
'.mode csv'
does what you’d expect
'.import '+str(csv_file).replace('\\','\\\\')+' <table_name>'
is the import command.
Unfortunately, since subprocess passes all follow-ons to -cmd
as quoted strings, you need to double up your backslashes if you have a windows directory path.
Stripping Headers
Not really the main point of the question, but here’s what I used. Again, I didn’t want to read the whole files into memory at any point:
with open(csv, "r") as source:
source.readline()
with open(str(csv)+"_nohead", "w") as target:
shutil.copyfileobj(source, target)
回答 7
基于Guy L解决方案(喜欢它),但可以处理转义的字段。
import csv, sqlite3
def _get_col_datatypes(fin):
dr = csv.DictReader(fin) # comma is default delimiter
fieldTypes = {}
for entry in dr:
feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
if not feildslLeft: break # We're done
for field in feildslLeft:
data = entry[field]
# Need data to decide
if len(data) == 0:
continue
if data.isdigit():
fieldTypes[field] = "INTEGER"
else:
fieldTypes[field] = "TEXT"
# TODO: Currently there's no support for DATE in sqllite
if len(feildslLeft) > 0:
raise Exception("Failed to find all the columns data types - Maybe some are empty?")
return fieldTypes
def escapingGenerator(f):
for line in f:
yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")
def csvToDb(csvFile,dbFile,tablename, outputToFile = False):
# TODO: implement output to file
with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
dt = _get_col_datatypes(fin)
fin.seek(0)
reader = csv.DictReader(fin)
# Keep the order of the columns name just as in the CSV
fields = reader.fieldnames
cols = []
# Set field and type
for f in fields:
cols.append("\"%s\" %s" % (f, dt[f]))
# Generate create table statement:
stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
print(stmt)
con = sqlite3.connect(dbFile)
cur = con.cursor()
cur.execute(stmt)
fin.seek(0)
reader = csv.reader(escapingGenerator(fin))
# Generate insert statement:
stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))
cur.executemany(stmt, reader)
con.commit()
con.close()
Based on Guy L solution (Love it) but can handle escaped fields.
import csv, sqlite3
def _get_col_datatypes(fin):
dr = csv.DictReader(fin) # comma is default delimiter
fieldTypes = {}
for entry in dr:
feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
if not feildslLeft: break # We're done
for field in feildslLeft:
data = entry[field]
# Need data to decide
if len(data) == 0:
continue
if data.isdigit():
fieldTypes[field] = "INTEGER"
else:
fieldTypes[field] = "TEXT"
# TODO: Currently there's no support for DATE in sqllite
if len(feildslLeft) > 0:
raise Exception("Failed to find all the columns data types - Maybe some are empty?")
return fieldTypes
def escapingGenerator(f):
for line in f:
yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")
def csvToDb(csvFile,dbFile,tablename, outputToFile = False):
# TODO: implement output to file
with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
dt = _get_col_datatypes(fin)
fin.seek(0)
reader = csv.DictReader(fin)
# Keep the order of the columns name just as in the CSV
fields = reader.fieldnames
cols = []
# Set field and type
for f in fields:
cols.append("\"%s\" %s" % (f, dt[f]))
# Generate create table statement:
stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
print(stmt)
con = sqlite3.connect(dbFile)
cur = con.cursor()
cur.execute(stmt)
fin.seek(0)
reader = csv.reader(escapingGenerator(fin))
# Generate insert statement:
stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))
cur.executemany(stmt, reader)
con.commit()
con.close()
回答 8
为此,您可以使用blaze
和odo
有效
import blaze as bz
csv_path = 'data.csv'
bz.odo(csv_path, 'sqlite:///data.db::data')
Odo将csv文件存储到data.db
该模式下的(sqlite数据库)data
或者您odo
直接使用,而无需使用blaze
。两种方法都可以。阅读本文档
You can do this using blaze
& odo
efficiently
import blaze as bz
csv_path = 'data.csv'
bz.odo(csv_path, 'sqlite:///data.db::data')
Odo will store the csv file to data.db
(sqlite database) under the schema data
Or you use odo
directly, without blaze
. Either ways is fine. Read this documentation
回答 9
如果必须将CSV文件作为python程序的一部分导入,则为简便起见,可以os.system
按照以下建议使用:
import os
cmd = """sqlite3 database.db <<< ".import input.csv mytable" """
rc = os.system(cmd)
print(rc)
关键是,通过指定数据库的文件名,假设读取数据没有错误,数据将自动保存。
If the CSV file must be imported as part of a python program, then for simplicity and efficiency, you could use os.system
along the lines suggested by the following:
import os
cmd = """sqlite3 database.db <<< ".import input.csv mytable" """
rc = os.system(cmd)
print(rc)
The point is that by specifying the filename of the database, the data will automatically be saved, assuming there are no errors reading it.
回答 10
import csv, sqlite3
def _get_col_datatypes(fin):
dr = csv.DictReader(fin) # comma is default delimiter
fieldTypes = {}
for entry in dr:
feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
if not feildslLeft: break # We're done
for field in feildslLeft:
data = entry[field]
# Need data to decide
if len(data) == 0:
continue
if data.isdigit():
fieldTypes[field] = "INTEGER"
else:
fieldTypes[field] = "TEXT"
# TODO: Currently there's no support for DATE in sqllite
if len(feildslLeft) > 0:
raise Exception("Failed to find all the columns data types - Maybe some are empty?")
return fieldTypes
def escapingGenerator(f):
for line in f:
yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")
def csvToDb(csvFile,dbFile,tablename, outputToFile = False):
# TODO: implement output to file
with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
dt = _get_col_datatypes(fin)
fin.seek(0)
reader = csv.DictReader(fin)
# Keep the order of the columns name just as in the CSV
fields = reader.fieldnames
cols = []
# Set field and type
for f in fields:
cols.append("\"%s\" %s" % (f, dt[f]))
# Generate create table statement:
stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
print(stmt)
con = sqlite3.connect(dbFile)
cur = con.cursor()
cur.execute(stmt)
fin.seek(0)
reader = csv.reader(escapingGenerator(fin))
# Generate insert statement:
stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))
cur.executemany(stmt, reader)
con.commit()
con.close()
import csv, sqlite3
def _get_col_datatypes(fin):
dr = csv.DictReader(fin) # comma is default delimiter
fieldTypes = {}
for entry in dr:
feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
if not feildslLeft: break # We're done
for field in feildslLeft:
data = entry[field]
# Need data to decide
if len(data) == 0:
continue
if data.isdigit():
fieldTypes[field] = "INTEGER"
else:
fieldTypes[field] = "TEXT"
# TODO: Currently there's no support for DATE in sqllite
if len(feildslLeft) > 0:
raise Exception("Failed to find all the columns data types - Maybe some are empty?")
return fieldTypes
def escapingGenerator(f):
for line in f:
yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")
def csvToDb(csvFile,dbFile,tablename, outputToFile = False):
# TODO: implement output to file
with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
dt = _get_col_datatypes(fin)
fin.seek(0)
reader = csv.DictReader(fin)
# Keep the order of the columns name just as in the CSV
fields = reader.fieldnames
cols = []
# Set field and type
for f in fields:
cols.append("\"%s\" %s" % (f, dt[f]))
# Generate create table statement:
stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
print(stmt)
con = sqlite3.connect(dbFile)
cur = con.cursor()
cur.execute(stmt)
fin.seek(0)
reader = csv.reader(escapingGenerator(fin))
# Generate insert statement:
stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))
cur.executemany(stmt, reader)
con.commit()
con.close()
回答 11
为了简单起见,您可以使用项目的Makefile中的sqlite3命令行工具。
%.sql3: %.csv
rm -f $@
sqlite3 $@ -echo -cmd ".mode csv" ".import $< $*"
%.dump: %.sql3
sqlite3 $< "select * from $*"
make test.sql3
然后从现有的test.csv文件使用单个表“ test”创建sqlite数据库。然后make test.dump
,您可以验证内容。
in the interest of simplicity, you could use the sqlite3 command line tool from the Makefile of your project.
%.sql3: %.csv
rm -f $@
sqlite3 $@ -echo -cmd ".mode csv" ".import $< $*"
%.dump: %.sql3
sqlite3 $< "select * from $*"
make test.sql3
then creates the sqlite database from an existing test.csv file, with a single table “test”. you can then make test.dump
to verify the contents.
回答 12
我发现有必要分批从csv到数据库的数据传输,以免耗尽内存。可以这样完成:
import csv
import sqlite3
from operator import itemgetter
# Establish connection
conn = sqlite3.connect("mydb.db")
# Create the table
conn.execute(
"""
CREATE TABLE persons(
person_id INTEGER,
last_name TEXT,
first_name TEXT,
address TEXT
)
"""
)
# These are the columns from the csv that we want
cols = ["person_id", "last_name", "first_name", "address"]
# If the csv file is huge, we instead add the data in chunks
chunksize = 10000
# Parse csv file and populate db in chunks
with conn, open("persons.csv") as f:
reader = csv.DictReader(f)
chunk = []
for i, row in reader:
if i % chunksize == 0 and i > 0:
conn.executemany(
"""
INSERT INTO persons
VALUES(?, ?, ?, ?)
""", chunk
)
chunk = []
items = itemgetter(*cols)(row)
chunk.append(items)
I’ve found that it can be necessary to break up the transfer of data from the csv to the database in chunks as to not run out of memory. This can be done like this:
import csv
import sqlite3
from operator import itemgetter
# Establish connection
conn = sqlite3.connect("mydb.db")
# Create the table
conn.execute(
"""
CREATE TABLE persons(
person_id INTEGER,
last_name TEXT,
first_name TEXT,
address TEXT
)
"""
)
# These are the columns from the csv that we want
cols = ["person_id", "last_name", "first_name", "address"]
# If the csv file is huge, we instead add the data in chunks
chunksize = 10000
# Parse csv file and populate db in chunks
with conn, open("persons.csv") as f:
reader = csv.DictReader(f)
chunk = []
for i, row in reader:
if i % chunksize == 0 and i > 0:
conn.executemany(
"""
INSERT INTO persons
VALUES(?, ?, ?, ?)
""", chunk
)
chunk = []
items = itemgetter(*cols)(row)
chunk.append(items)