问题:如何从mongodb导入数据到熊猫?
我需要分析mongodb中的集合中有大量数据。如何将这些数据导入熊猫?
我是熊猫和numpy的新手。
编辑:mongodb集合包含带有日期和时间标记的传感器值。传感器值是float数据类型。
样本数据:
{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
"SensorReport"
],
"Readings" : [
{
"a" : 0.958069536790466,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
"b" : 6.296118156595,
"_cls" : "Reading"
},
{
"a" : 0.95574014778624,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
"b" : 6.29651468650064,
"_cls" : "Reading"
},
{
"a" : 0.953648289182713,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
"b" : 7.29679823731148,
"_cls" : "Reading"
},
{
"a" : 0.955931884300997,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
"b" : 6.29642922525632,
"_cls" : "Reading"
},
{
"a" : 0.95821381,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
"b" : 7.28956613,
"_cls" : "Reading"
},
{
"a" : 4.95821335,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
"b" : 6.28956574,
"_cls" : "Reading"
},
{
"a" : 9.95821341,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
"b" : 0.28956488,
"_cls" : "Reading"
},
{
"a" : 1.95667927,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
"b" : 0.29115237,
"_cls" : "Reading"
}
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}
I have a large amount of data in a collection in mongodb which I need to analyze. How do i import that data to pandas?
I am new to pandas and numpy.
EDIT:
The mongodb collection contains sensor values tagged with date and time. The sensor values are of float datatype.
Sample Data:
{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
"SensorReport"
],
"Readings" : [
{
"a" : 0.958069536790466,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
"b" : 6.296118156595,
"_cls" : "Reading"
},
{
"a" : 0.95574014778624,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
"b" : 6.29651468650064,
"_cls" : "Reading"
},
{
"a" : 0.953648289182713,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
"b" : 7.29679823731148,
"_cls" : "Reading"
},
{
"a" : 0.955931884300997,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
"b" : 6.29642922525632,
"_cls" : "Reading"
},
{
"a" : 0.95821381,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
"b" : 7.28956613,
"_cls" : "Reading"
},
{
"a" : 4.95821335,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
"b" : 6.28956574,
"_cls" : "Reading"
},
{
"a" : 9.95821341,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
"b" : 0.28956488,
"_cls" : "Reading"
},
{
"a" : 1.95667927,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
"b" : 0.29115237,
"_cls" : "Reading"
}
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}
回答 0
pymongo
可能会帮助您,以下是我正在使用的一些代码:
import pandas as pd
from pymongo import MongoClient
def _connect_mongo(host, port, username, password, db):
""" A util for making a connection to mongo """
if username and password:
mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
conn = MongoClient(mongo_uri)
else:
conn = MongoClient(host, port)
return conn[db]
def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
""" Read from Mongo and Store into DataFrame """
# Connect to MongoDB
db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
# Make a query to the specific DB and Collection
cursor = db[collection].find(query)
# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))
# Delete the _id
if no_id:
del df['_id']
return df
pymongo
might give you a hand, followings are some codes I’m using:
import pandas as pd
from pymongo import MongoClient
def _connect_mongo(host, port, username, password, db):
""" A util for making a connection to mongo """
if username and password:
mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
conn = MongoClient(mongo_uri)
else:
conn = MongoClient(host, port)
return conn[db]
def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
""" Read from Mongo and Store into DataFrame """
# Connect to MongoDB
db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
# Make a query to the specific DB and Collection
cursor = db[collection].find(query)
# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))
# Delete the _id
if no_id:
del df['_id']
return df
回答 1
您可以使用此代码将mongodb数据加载到pandas DataFrame。这个对我有用。希望也能为您服务。
import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))
You can load your mongodb data to pandas DataFrame using this code. It works for me. Hopefully for you too.
import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))
回答 2
回答 3
根据PEP,简单胜于复杂:
import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())
您可以像使用常规mongoDB数据库一样包含条件,甚至可以使用find_one()从数据库中仅获取一个元素,等等。
和瞧!
As per PEP, simple is better than complicated:
import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())
You can include conditions as you would working with regular mongoDB database or even use find_one() to get only one element from the database, etc.
and voila!
回答 4
import pandas as pd
from odo import odo
data = odo('mongodb://localhost/db::collection', pd.DataFrame)
import pandas as pd
from odo import odo
data = odo('mongodb://localhost/db::collection', pd.DataFrame)
回答 5
For dealing with out-of-core (not fitting into RAM) data efficiently (i.e. with parallel execution), you can try Python Blaze ecosystem: Blaze / Dask / Odo.
Blaze (and Odo) has out-of-the-box functions to deal with MongoDB.
A few useful articles to start off:
And an article which shows what amazing things are possible with Blaze stack: Analyzing 1.7 Billion Reddit Comments with Blaze and Impala (essentially, querying 975 Gb of Reddit comments in seconds).
P.S. I’m not affiliated with any of these technologies.
回答 6
我发现非常有用的另一个选择是:
from pandas.io.json import json_normalize
cursor = my_collection.find()
df = json_normalize(cursor)
这样,您就可以免费获取嵌套的mongodb文档。
Another option I found very useful is:
from pandas.io.json import json_normalize
cursor = my_collection.find()
df = json_normalize(cursor)
this way you get the unfolding of nested mongodb documents for free.
回答 7
使用
pandas.DataFrame(list(...))
如果迭代器/生成器结果很大,将消耗大量内存
更好地生成小块并最终合并
def iterator2dataframes(iterator, chunk_size: int):
"""Turn an iterator into multiple small pandas.DataFrame
This is a balance between memory and efficiency
"""
records = []
frames = []
for i, record in enumerate(iterator):
records.append(record)
if i % chunk_size == chunk_size - 1:
frames.append(pd.DataFrame(records))
records = []
if records:
frames.append(pd.DataFrame(records))
return pd.concat(frames)
Using
pandas.DataFrame(list(...))
will consume a lot of memory if the iterator/generator result is large
better to generate small chunks and concat at the end
def iterator2dataframes(iterator, chunk_size: int):
"""Turn an iterator into multiple small pandas.DataFrame
This is a balance between memory and efficiency
"""
records = []
frames = []
for i, record in enumerate(iterator):
records.append(record)
if i % chunk_size == chunk_size - 1:
frames.append(pd.DataFrame(records))
records = []
if records:
frames.append(pd.DataFrame(records))
return pd.concat(frames)
回答 8
回答 9
在waitingkuo回答了这个很好的答案之后,我想添加使用与.read_sql()和.read_csv()一致的 chunksize进行此操作的可能性。我通过避免“迭代器” /“光标”的每个“记录”一一列举来扩大Deu Leung的答案。我将借用以前的read_mongo函数。
def read_mongo(db,
collection, query={},
host='localhost', port=27017,
username=None, password=None,
chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """
# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]
# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
skips_variable = [0,len(skips_variable)]
# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):
# Expand the cursor and construct the DataFrame
#df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))
if no_id:
del df_aux['_id']
# Concatenate the chunks into a unique df
if 'df' not in locals():
df = df_aux
else:
df = pd.concat([df, df_aux], ignore_index=True)
return df
Following this great answer by waitingkuo I would like to add the possibility of doing that using chunksize in line with .read_sql() and .read_csv(). I enlarge the answer from Deu Leung by avoiding go one by one each ‘record’ of the ‘iterator’ / ‘cursor’.
I will borrow previous read_mongo function.
def read_mongo(db,
collection, query={},
host='localhost', port=27017,
username=None, password=None,
chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """
# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]
# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
skips_variable = [0,len(skips_variable)]
# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):
# Expand the cursor and construct the DataFrame
#df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))
if no_id:
del df_aux['_id']
# Concatenate the chunks into a unique df
if 'df' not in locals():
df = df_aux
else:
df = pd.concat([df, df_aux], ignore_index=True)
return df
回答 10
使用分页的类似方法,例如Rafael Valero,waitingkuo和Deu Leung :
def read_mongo(
# db,
collection, query=None,
# host='localhost', port=27017, username=None, password=None,
chunksize = 100, page_num=1, no_id=True):
# Connect to MongoDB
db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
# Calculate number of documents to skip
skips = chunksize * (page_num - 1)
# Sorry, this is in spanish
# https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
if not query:
query = {}
# Make a query to the specific DB and Collection
cursor = db[collection].find(query).skip(skips).limit(chunksize)
# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))
# Delete the _id
if no_id:
del df['_id']
return df
A similar approach like Rafael Valero, waitingkuo and Deu Leung using pagination:
def read_mongo(
# db,
collection, query=None,
# host='localhost', port=27017, username=None, password=None,
chunksize = 100, page_num=1, no_id=True):
# Connect to MongoDB
db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
# Calculate number of documents to skip
skips = chunksize * (page_num - 1)
# Sorry, this is in spanish
# https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
if not query:
query = {}
# Make a query to the specific DB and Collection
cursor = db[collection].find(query).skip(skips).limit(chunksize)
# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))
# Delete the _id
if no_id:
del df['_id']
return df
回答 11
您可以通过三行使用pdmongo实现所需的功能:
import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")
如果您的数据非常大,则可以首先通过过滤不需要的数据来进行汇总查询,然后将它们映射到所需的列。
这是映射Readings.a
到列a
并按列过滤的示例reportCount
:
import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [{'$match': {'reportCount': {'$gt': 6}}}, {'$unwind': '$Readings'}, {'$project': {'a': '$Readings.a'}}], "mongodb://localhost:27017/mydb")
read_mongo
接受与pymongo聚合相同的参数
You can achieve what you want with pdmongo in three lines:
import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")
If your data is very large, you can do an aggregate query first by filtering data you do not want, then map them to your desired columns.
Here is an example of mapping Readings.a
to column a
and filtering by reportCount
column:
import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [{'$match': {'reportCount': {'$gt': 6}}}, {'$unwind': '$Readings'}, {'$project': {'a': '$Readings.a'}}], "mongodb://localhost:27017/mydb")
read_mongo
accepts the same arguments as pymongo aggregate