标签归档:mongodb

如何从mongodb导入数据到熊猫?

问题:如何从mongodb导入数据到熊猫?

我需要分析mongodb中的集合中有大量数据。如何将这些数据导入熊猫?

我是熊猫和numpy的新手。

编辑:mongodb集合包含带有日期和时间标记的传感器值。传感器值是float数据类型。

样本数据:

{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}

I have a large amount of data in a collection in mongodb which I need to analyze. How do i import that data to pandas?

I am new to pandas and numpy.

EDIT: The mongodb collection contains sensor values tagged with date and time. The sensor values are of float datatype.

Sample Data:

{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}

回答 0

pymongo 可能会帮助您,以下是我正在使用的一些代码:

import pandas as pd
from pymongo import MongoClient


def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)


    return conn[db]


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

pymongo might give you a hand, followings are some codes I’m using:

import pandas as pd
from pymongo import MongoClient


def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)


    return conn[db]


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

回答 1

您可以使用此代码将mongodb数据加载到pandas DataFrame。这个对我有用。希望也能为您服务。

import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))

You can load your mongodb data to pandas DataFrame using this code. It works for me. Hopefully for you too.

import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))

回答 2

Monary正是这样做的,而且它超级快。(另一个链接

请参阅这篇很酷的文章,其中包括快速教程和一些时间安排。

Monary does exactly that, and it’s super fast. (another link)

See this cool post which includes a quick tutorial and some timings.


回答 3

根据PEP,简单胜于复杂:

import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())

您可以像使用常规mongoDB数据库一样包含条件,甚至可以使用find_one()从数据库中仅获取一个元素,等等。

和瞧!

As per PEP, simple is better than complicated:

import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())

You can include conditions as you would working with regular mongoDB database or even use find_one() to get only one element from the database, etc.

and voila!


回答 4

import pandas as pd
from odo import odo

data = odo('mongodb://localhost/db::collection', pd.DataFrame)
import pandas as pd
from odo import odo

data = odo('mongodb://localhost/db::collection', pd.DataFrame)

回答 5

为了有效地处理内核外(不适合RAM)数据(即并行执行),可以尝试使用Python Blaze生态系统:Blaze / Dask / Odo。

Blaze(和Odo)具有开箱即用的功能来处理MongoDB。

一些有用的文章开始:

还有一篇文章展示了Blaze堆栈可能带来的惊人功能:使用Blaze和Impala分析17亿条Reddit注释(本质上是在几秒钟内查询975 Gb Reddit注释)。

PS我不属于任何这些技术。

For dealing with out-of-core (not fitting into RAM) data efficiently (i.e. with parallel execution), you can try Python Blaze ecosystem: Blaze / Dask / Odo.

Blaze (and Odo) has out-of-the-box functions to deal with MongoDB.

A few useful articles to start off:

And an article which shows what amazing things are possible with Blaze stack: Analyzing 1.7 Billion Reddit Comments with Blaze and Impala (essentially, querying 975 Gb of Reddit comments in seconds).

P.S. I’m not affiliated with any of these technologies.


回答 6

我发现非常有用的另一个选择是:

from pandas.io.json import json_normalize

cursor = my_collection.find()
df = json_normalize(cursor)

这样,您就可以免费获取嵌套的mongodb文档。

Another option I found very useful is:

from pandas.io.json import json_normalize

cursor = my_collection.find()
df = json_normalize(cursor)

this way you get the unfolding of nested mongodb documents for free.


回答 7

使用

pandas.DataFrame(list(...))

如果迭代器/生成器结果很大,将消耗大量内存

更好地生成小块并最终合并

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)

Using

pandas.DataFrame(list(...))

will consume a lot of memory if the iterator/generator result is large

better to generate small chunks and concat at the end

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)

回答 8

http://docs.mongodb.org/manual/reference/mongoexport

导出到csv并使用read_csv 或JSON并使用DataFrame.from_records()

http://docs.mongodb.org/manual/reference/mongoexport

export to csv and use read_csv or JSON and use DataFrame.from_records()


回答 9

waitingkuo回答了这个很好的答案之后,我想添加使用与.read_sql().read_csv()一致的 chunksize进行此操作的可能性。我通过避免“迭代器” /“光标”的每个“记录”一一列举来扩大Deu Leung的答案。我将借用以前的read_mongo函数。

def read_mongo(db, 
           collection, query={}, 
           host='localhost', port=27017, 
           username=None, password=None,
           chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """


# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]


# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):

    # Expand the cursor and construct the DataFrame
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))

    if no_id:
        del df_aux['_id']

    # Concatenate the chunks into a unique df
    if 'df' not in locals():
        df =  df_aux
    else:
        df = pd.concat([df, df_aux], ignore_index=True)

return df

Following this great answer by waitingkuo I would like to add the possibility of doing that using chunksize in line with .read_sql() and .read_csv(). I enlarge the answer from Deu Leung by avoiding go one by one each ‘record’ of the ‘iterator’ / ‘cursor’. I will borrow previous read_mongo function.

def read_mongo(db, 
           collection, query={}, 
           host='localhost', port=27017, 
           username=None, password=None,
           chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """


# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]


# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):

    # Expand the cursor and construct the DataFrame
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))

    if no_id:
        del df_aux['_id']

    # Concatenate the chunks into a unique df
    if 'df' not in locals():
        df =  df_aux
    else:
        df = pd.concat([df, df_aux], ignore_index=True)

return df

回答 10

使用分页的类似方法,例如Rafael Valero,waitingkuo和Deu Leung :

def read_mongo(
       # db, 
       collection, query=None, 
       # host='localhost', port=27017, username=None, password=None,
       chunksize = 100, page_num=1, no_id=True):

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Calculate number of documents to skip
    skips = chunksize * (page_num - 1)

    # Sorry, this is in spanish
    # https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
    if not query:
        query = {}

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query).skip(skips).limit(chunksize)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

A similar approach like Rafael Valero, waitingkuo and Deu Leung using pagination:

def read_mongo(
       # db, 
       collection, query=None, 
       # host='localhost', port=27017, username=None, password=None,
       chunksize = 100, page_num=1, no_id=True):

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Calculate number of documents to skip
    skips = chunksize * (page_num - 1)

    # Sorry, this is in spanish
    # https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
    if not query:
        query = {}

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query).skip(skips).limit(chunksize)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

回答 11

您可以通过三行使用pdmongo实现所需的功能

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")

如果您的数据非常大,则可以首先通过过滤不需要的数据来进行汇总查询,然后将它们映射到所需的列。

这是映射Readings.a到列a并按列过滤的示例reportCount

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [{'$match': {'reportCount': {'$gt': 6}}}, {'$unwind': '$Readings'}, {'$project': {'a': '$Readings.a'}}], "mongodb://localhost:27017/mydb")

read_mongo接受与pymongo聚合相同的参数

You can achieve what you want with pdmongo in three lines:

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")

If your data is very large, you can do an aggregate query first by filtering data you do not want, then map them to your desired columns.

Here is an example of mapping Readings.a to column a and filtering by reportCount column:

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [{'$match': {'reportCount': {'$gt': 6}}}, {'$unwind': '$Readings'}, {'$project': {'a': '$Readings.a'}}], "mongodb://localhost:27017/mydb")

read_mongo accepts the same arguments as pymongo aggregate


TypeError:ObjectId(”)不可序列化JSON

问题:TypeError:ObjectId(”)不可序列化JSON

在使用Python查询文档上的聚合函数后,我从MongoDB返回了响应,它返回有效响应,我可以打印该响应但不能返回它。

错误:

TypeError: ObjectId('51948e86c25f4b1d1c0d303c') is not JSON serializable

打印:

{'result': [{'_id': ObjectId('51948e86c25f4b1d1c0d303c'), 'api_calls_with_key': 4, 'api_calls_per_day': 0.375, 'api_calls_total': 6, 'api_calls_without_key': 2}], 'ok': 1.0}

但是当我尝试返回时:

TypeError: ObjectId('51948e86c25f4b1d1c0d303c') is not JSON serializable

这是RESTfull调用:

@appv1.route('/v1/analytics')
def get_api_analytics():
    # get handle to collections in MongoDB
    statistics = sldb.statistics

    objectid = ObjectId("51948e86c25f4b1d1c0d303c")

    analytics = statistics.aggregate([
    {'$match': {'owner': objectid}},
    {'$project': {'owner': "$owner",
    'api_calls_with_key': {'$cond': [{'$eq': ["$apikey", None]}, 0, 1]},
    'api_calls_without_key': {'$cond': [{'$ne': ["$apikey", None]}, 0, 1]}
    }},
    {'$group': {'_id': "$owner",
    'api_calls_with_key': {'$sum': "$api_calls_with_key"},
    'api_calls_without_key': {'$sum': "$api_calls_without_key"}
    }},
    {'$project': {'api_calls_with_key': "$api_calls_with_key",
    'api_calls_without_key': "$api_calls_without_key",
    'api_calls_total': {'$add': ["$api_calls_with_key", "$api_calls_without_key"]},
    'api_calls_per_day': {'$divide': [{'$add': ["$api_calls_with_key", "$api_calls_without_key"]}, {'$dayOfMonth': datetime.now()}]},
    }}
    ])


    print(analytics)

    return analytics

数据库连接良好,集合也在那里,我得到了有效的预期结果,但是当我尝试返回时,它给了我Json错误。任何想法如何将响应转换回JSON。谢谢

My response back from MongoDB after querying an aggregated function on document using Python, It returns valid response and i can print it but can not return it.

Error:

TypeError: ObjectId('51948e86c25f4b1d1c0d303c') is not JSON serializable

Print:

{'result': [{'_id': ObjectId('51948e86c25f4b1d1c0d303c'), 'api_calls_with_key': 4, 'api_calls_per_day': 0.375, 'api_calls_total': 6, 'api_calls_without_key': 2}], 'ok': 1.0}

But When i try to return:

TypeError: ObjectId('51948e86c25f4b1d1c0d303c') is not JSON serializable

It is RESTfull call:

@appv1.route('/v1/analytics')
def get_api_analytics():
    # get handle to collections in MongoDB
    statistics = sldb.statistics

    objectid = ObjectId("51948e86c25f4b1d1c0d303c")

    analytics = statistics.aggregate([
    {'$match': {'owner': objectid}},
    {'$project': {'owner': "$owner",
    'api_calls_with_key': {'$cond': [{'$eq': ["$apikey", None]}, 0, 1]},
    'api_calls_without_key': {'$cond': [{'$ne': ["$apikey", None]}, 0, 1]}
    }},
    {'$group': {'_id': "$owner",
    'api_calls_with_key': {'$sum': "$api_calls_with_key"},
    'api_calls_without_key': {'$sum': "$api_calls_without_key"}
    }},
    {'$project': {'api_calls_with_key': "$api_calls_with_key",
    'api_calls_without_key': "$api_calls_without_key",
    'api_calls_total': {'$add': ["$api_calls_with_key", "$api_calls_without_key"]},
    'api_calls_per_day': {'$divide': [{'$add': ["$api_calls_with_key", "$api_calls_without_key"]}, {'$dayOfMonth': datetime.now()}]},
    }}
    ])


    print(analytics)

    return analytics

db is well connected and collection is there too and I got back valid expected result but when i try to return it gives me Json error. Any idea how to convert the response back into JSON. Thanks


回答 0

您应该定义自己JSONEncoder并使用它:

import json
from bson import ObjectId

class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, ObjectId):
            return str(o)
        return json.JSONEncoder.default(self, o)

JSONEncoder().encode(analytics)

也可以通过以下方式使用它。

json.encode(analytics, cls=JSONEncoder)

You should define you own JSONEncoder and using it:

import json
from bson import ObjectId

class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, ObjectId):
            return str(o)
        return json.JSONEncoder.default(self, o)

JSONEncoder().encode(analytics)

It’s also possible to use it in the following way.

json.encode(analytics, cls=JSONEncoder)

回答 1

Pymongo提供json_util-您可以改用它来处理BSON类型

Pymongo provides json_util – you can use that one instead to handle BSON types

def parse_json(data):
    return json.loads(json_util.dumps(data))

回答 2

>>> from bson import Binary, Code
>>> from bson.json_util import dumps
>>> dumps([{'foo': [1, 2]},
...        {'bar': {'hello': 'world'}},
...        {'code': Code("function x() { return 1; }")},
...        {'bin': Binary("")}])
'[{"foo": [1, 2]}, {"bar": {"hello": "world"}}, {"code": {"$code": "function x() { return 1; }", "$scope": {}}}, {"bin": {"$binary": "AQIDBA==", "$type": "00"}}]'

来自json_util的实际示例。

与Flask的jsonify不同,“转储”将返回一个字符串,因此不能用作Flask的jsonify的1:1替换。

但是这个问题表明,我们可以使用json_util.dumps()进行序列化,使用json.loads()转换回dict,最后对其调用Flask的jsonify。

示例(取自上一个问题的答案):

from bson import json_util, ObjectId
import json

#Lets create some dummy document to prove it will work
page = {'foo': ObjectId(), 'bar': [ObjectId(), ObjectId()]}

#Dump loaded BSON to valid JSON string and reload it as dict
page_sanitized = json.loads(json_util.dumps(page))
return page_sanitized

此解决方案会将ObjectId和其他代码(例如Binary,Code等)转换为等效的字符串,例如“ $ oid”。

JSON输出如下所示:

{
  "_id": {
    "$oid": "abc123"
  }
}
>>> from bson import Binary, Code
>>> from bson.json_util import dumps
>>> dumps([{'foo': [1, 2]},
...        {'bar': {'hello': 'world'}},
...        {'code': Code("function x() { return 1; }")},
...        {'bin': Binary("")}])
'[{"foo": [1, 2]}, {"bar": {"hello": "world"}}, {"code": {"$code": "function x() { return 1; }", "$scope": {}}}, {"bin": {"$binary": "AQIDBA==", "$type": "00"}}]'

Actual example from json_util.

Unlike Flask’s jsonify, “dumps” will return a string, so it cannot be used as a 1:1 replacement of Flask’s jsonify.

But this question shows that we can serialize using json_util.dumps(), convert back to dict using json.loads() and finally call Flask’s jsonify on it.

Example (derived from previous question’s answer):

from bson import json_util, ObjectId
import json

#Lets create some dummy document to prove it will work
page = {'foo': ObjectId(), 'bar': [ObjectId(), ObjectId()]}

#Dump loaded BSON to valid JSON string and reload it as dict
page_sanitized = json.loads(json_util.dumps(page))
return page_sanitized

This solution will convert ObjectId and others (ie Binary, Code, etc) to a string equivalent such as “$oid.”

JSON output would look like this:

{
  "_id": {
    "$oid": "abc123"
  }
}

回答 3

收到“无法JSON序列化”错误的大多数用户只需default=str使用即可指定json.dumps。例如:

json.dumps(my_obj, default=str)

这将强制转换为str,从而避免错误。当然,然后查看生成的输出以确认这就是您所需要的。

Most users who receive the “not JSON serializable” error simply need to specify default=str when using json.dumps. For example:

json.dumps(my_obj, default=str)

This will force a conversion to str, preventing the error. Of course then look at the generated output to confirm that it is what you need.


回答 4

from bson import json_util
import json

@app.route('/')
def index():
    for _ in "collection_name".find():
        return json.dumps(i, indent=4, default=json_util.default)

这是将BSON转换为JSON对象的示例示例。你可以试试看

from bson import json_util
import json

@app.route('/')
def index():
    for _ in "collection_name".find():
        return json.dumps(i, indent=4, default=json_util.default)

This is the sample example for converting BSON into JSON object. You can try this.


回答 5

作为快速替代品,您可以更改{'owner': objectid}{'owner': str(objectid)}

但是定义自己的方法JSONEncoder是更好的解决方案,它取决于您的要求。

As a quick replacement, you can change {'owner': objectid} to {'owner': str(objectid)}.

But defining your own JSONEncoder is a better solution, it depends on your requirements.


回答 6

张贴在这里,因为我认为它可能是使用的人有用的Flaskpymongo。这是我当前的“最佳实践”设置,用于允许flask封送pymongo bson数据类型。

mongoflask.py

from datetime import datetime, date

import isodate as iso
from bson import ObjectId
from flask.json import JSONEncoder
from werkzeug.routing import BaseConverter


class MongoJSONEncoder(JSONEncoder):
    def default(self, o):
        if isinstance(o, (datetime, date)):
            return iso.datetime_isoformat(o)
        if isinstance(o, ObjectId):
            return str(o)
        else:
            return super().default(o)


class ObjectIdConverter(BaseConverter):
    def to_python(self, value):
        return ObjectId(value)

    def to_url(self, value):
        return str(value)

app.py

from .mongoflask import MongoJSONEncoder, ObjectIdConverter

def create_app():
    app = Flask(__name__)
    app.json_encoder = MongoJSONEncoder
    app.url_map.converters['objectid'] = ObjectIdConverter

    # Client sends their string, we interpret it as an ObjectId
    @app.route('/users/<objectid:user_id>')
    def show_user(user_id):
        # setup not shown, pretend this gets us a pymongo db object
        db = get_db()

        # user_id is a bson.ObjectId ready to use with pymongo!
        result = db.users.find_one({'_id': user_id})

        # And jsonify returns normal looking json!
        # {"_id": "5b6b6959828619572d48a9da",
        #  "name": "Will",
        #  "birthday": "1990-03-17T00:00:00Z"}
        return jsonify(result)


    return app

为什么这样做而不是提供BSON或mongod扩展JSON

我认为提供mongo特殊JSON给客户端应用程序带来了负担。大多数客户端应用程序都不会以任何复杂的方式使用mongo对象。如果我提供扩展的json,现在必须在服务器端和客户端使用它。ObjectId并且Timestamp更容易作为字符串使用,这使所有mongo编组的疯狂都隔离在服务器上。

{
  "_id": "5b6b6959828619572d48a9da",
  "created_at": "2018-08-08T22:06:17Z"
}

我认为,与大多数应用程序相比,这要轻得多。

{
  "_id": {"$oid": "5b6b6959828619572d48a9da"},
  "created_at": {"$date": 1533837843000}
}

Posting here as I think it may be useful for people using Flask with pymongo. This is my current “best practice” setup for allowing flask to marshall pymongo bson data types.

mongoflask.py

from datetime import datetime, date

import isodate as iso
from bson import ObjectId
from flask.json import JSONEncoder
from werkzeug.routing import BaseConverter


class MongoJSONEncoder(JSONEncoder):
    def default(self, o):
        if isinstance(o, (datetime, date)):
            return iso.datetime_isoformat(o)
        if isinstance(o, ObjectId):
            return str(o)
        else:
            return super().default(o)


class ObjectIdConverter(BaseConverter):
    def to_python(self, value):
        return ObjectId(value)

    def to_url(self, value):
        return str(value)

app.py

from .mongoflask import MongoJSONEncoder, ObjectIdConverter

def create_app():
    app = Flask(__name__)
    app.json_encoder = MongoJSONEncoder
    app.url_map.converters['objectid'] = ObjectIdConverter

    # Client sends their string, we interpret it as an ObjectId
    @app.route('/users/<objectid:user_id>')
    def show_user(user_id):
        # setup not shown, pretend this gets us a pymongo db object
        db = get_db()

        # user_id is a bson.ObjectId ready to use with pymongo!
        result = db.users.find_one({'_id': user_id})

        # And jsonify returns normal looking json!
        # {"_id": "5b6b6959828619572d48a9da",
        #  "name": "Will",
        #  "birthday": "1990-03-17T00:00:00Z"}
        return jsonify(result)


    return app

Why do this instead of serving BSON or mongod extended JSON?

I think serving mongo special JSON puts a burden on client applications. Most client apps will not care using mongo objects in any complex way. If I serve extended json, now I have to use it server side, and the client side. ObjectId and Timestamp are easier to work with as strings and this keeps all this mongo marshalling madness quarantined to the server.

{
  "_id": "5b6b6959828619572d48a9da",
  "created_at": "2018-08-08T22:06:17Z"
}

I think this is less onerous to work with for most applications than.

{
  "_id": {"$oid": "5b6b6959828619572d48a9da"},
  "created_at": {"$date": 1533837843000}
}

回答 7

这就是我最近修复错误的方式

    @app.route('/')
    def home():
        docs = []
        for doc in db.person.find():
            doc.pop('_id') 
            docs.append(doc)
        return jsonify(docs)

This is how I’ve recently fixed the error

    @app.route('/')
    def home():
        docs = []
        for doc in db.person.find():
            doc.pop('_id') 
            docs.append(doc)
        return jsonify(docs)

回答 8

我知道我发帖太晚了,但我认为这至少可以帮助到一些人!

tim和defuz提到的两个示例(都获得最高投票)都可以很好地工作。但是,有时会有细微的差别,这有时可能很重要。

  1. 以下方法添加了一个额外的字段,该字段是多余的,在所有情况下可能都不理想

Pymongo提供了json_util-您可以改用那个来处理BSON类型

输出:{“ _id”:{“ $ oid”:“ abc123”}}

  1. 在JsonEncoder类以所需的字符串格式给出相同输出的情况下,我们还需要使用json.loads(output)。但这导致

输出:{“ _id”:“ abc123”}

即使第一种方法看起来很简单,但这两种方法都需要非常少的精力。

I know I’m posting late but thought it would help at least a few folks!

Both the examples mentioned by tim and defuz(which are top voted) works perfectly fine. However, there is a minute difference which could be significant at times.

  1. The following method adds one extra field which is redundant and may not be ideal in all the cases

Pymongo provides json_util – you can use that one instead to handle BSON types

Output: { “_id”: { “$oid”: “abc123” } }

  1. Where as the JsonEncoder class gives the same output in the string format as we need and we need to use json.loads(output) in addition. But it leads to

Output: { “_id”: “abc123” }

Even though, the first method looks simple, both the method need very minimal effort.


回答 9

就我而言,我需要这样的东西:

class JsonEncoder():
    def encode(self, o):
        if '_id' in o:
            o['_id'] = str(o['_id'])
        return o

in my case I needed something like this:

class JsonEncoder():
    def encode(self, o):
        if '_id' in o:
            o['_id'] = str(o['_id'])
        return o

回答 10

Flask的jsonify提供了JSON Security中描述的安全性增强功能。如果自定义编码器与Flask一起使用,则最好考虑JSON安全性中讨论的要点

Flask’s jsonify provides security enhancement as described in JSON Security. If custom encoder is used with Flask, its better to consider the points discussed in the JSON Security


回答 11

我想提供一个附加的解决方案,以改善公认的答案。我以前在这里的另一个线程中提供了答案。

from flask import Flask
from flask.json import JSONEncoder

from bson import json_util

from . import resources

# define a custom encoder point to the json_util provided by pymongo (or its dependency bson)
class CustomJSONEncoder(JSONEncoder):
    def default(self, obj): return json_util.default(obj)

application = Flask(__name__)
application.json_encoder = CustomJSONEncoder

if __name__ == "__main__":
    application.run()

I would like to provide an additional solution that improves the accepted answer. I have previously provided the answers in another thread here.

from flask import Flask
from flask.json import JSONEncoder

from bson import json_util

from . import resources

# define a custom encoder point to the json_util provided by pymongo (or its dependency bson)
class CustomJSONEncoder(JSONEncoder):
    def default(self, obj): return json_util.default(obj)

application = Flask(__name__)
application.json_encoder = CustomJSONEncoder

if __name__ == "__main__":
    application.run()

回答 12

如果您不需要记录的_id,我建议在查询数据库时将其取消设置,这将使您可以直接打印返回的记录,例如

要在查询时取消设置_id,然后在循环中打印数据,您可以编写如下代码

records = mycollection.find(query, {'_id': 0}) #second argument {'_id':0} unsets the id from the query
for record in records:
    print(record)

If you will not be needing the _id of the records I will recommend unsetting it when querying the DB which will enable you to print the returned records directly e.g

To unset the _id when querying and then print data in a loop you write something like this

records = mycollection.find(query, {'_id': 0}) #second argument {'_id':0} unsets the id from the query
for record in records:
    print(record)

回答 13

解决方案:mongoengine +棉花糖

如果您使用mongoenginemarshamallow则此解决方案可能适用于您。

基本上,我是String从棉花糖导入字段的,而我覆盖了默认值Schema id进行String编码。

from marshmallow import Schema
from marshmallow.fields import String

class FrontendUserSchema(Schema):

    id = String()

    class Meta:
        fields = ("id", "email")

SOLUTION for: mongoengine + marshmallow

If you use mongoengine and marshamallow then this solution might be applicable for you.

Basically, I imported String field from marshmallow, and I overwritten default Schema id to be String encoded.

from marshmallow import Schema
from marshmallow.fields import String

class FrontendUserSchema(Schema):

    id = String()

    class Meta:
        fields = ("id", "email")

回答 14

from bson.objectid import ObjectId
from core.services.db_connection import DbConnectionService

class DbExecutionService:
     def __init__(self):
        self.db = DbConnectionService()

     def list(self, collection, search):
        session = self.db.create_connection(collection)
        return list(map(lambda row: {i: str(row[i]) if isinstance(row[i], ObjectId) else row[i] for i in row}, session.find(search))
from bson.objectid import ObjectId
from core.services.db_connection import DbConnectionService

class DbExecutionService:
     def __init__(self):
        self.db = DbConnectionService()

     def list(self, collection, search):
        session = self.db.create_connection(collection)
        return list(map(lambda row: {i: str(row[i]) if isinstance(row[i], ObjectId) else row[i] for i in row}, session.find(search))

回答 15

如果您不想_id回应,可以重构代码,如下所示:

jsonResponse = getResponse(mock_data)
del jsonResponse['_id'] # removes '_id' from the final response
return jsonResponse

这将消除TypeError: ObjectId('') is not JSON serializable错误。

If you don’t want _id in response, you can refactor your code something like this:

jsonResponse = getResponse(mock_data)
del jsonResponse['_id'] # removes '_id' from the final response
return jsonResponse

This will remove the TypeError: ObjectId('') is not JSON serializable error.


在PyMongo中使用.sort

问题:在PyMongo中使用.sort

使用PyMongo,当我尝试检索按其“数字”和“日期”字段排序的对象时,如下所示:

db.test.find({"number": {"$gt": 1}}).sort({"number": 1, "date": -1})

我收到此错误:

TypeError: if no direction is specified, key_or_list must be an instance of list

我的排序查询出了什么问题?

With PyMongo, when I try to retrieve objects sorted by their ‘number’ and ‘date’ fields like this:

db.test.find({"number": {"$gt": 1}}).sort({"number": 1, "date": -1})

I get this error:

TypeError: if no direction is specified, key_or_list must be an instance of list

What’s wrong with my sort query?


回答 0

sort 应该是键方向对的列表,即

db.test.find({"number": {"$gt": 1}}).sort([("number", 1), ("date", -1)])

之所以必须是列表,是因为参数的顺序很重要,而dicts在Python <3.6中不排序

sort should be a list of key-direction pairs, that is

db.test.find({"number": {"$gt": 1}}).sort([("number", 1), ("date", -1)])

The reason why this has to be a list is that the ordering of the arguments matters and dicts are not ordered in Python < 3.6


如何用pymongo排序mongodb

问题:如何用pymongo排序mongodb

我在查询mongoDB时尝试使用排序功能,但是失败了。相同的查询在MongoDB控制台中有效,但不适用于此处。代码如下:

import pymongo

from  pymongo import Connection
connection = Connection()
db = connection.myDB
print db.posts.count()
for post in db.posts.find({}, {'entities.user_mentions.screen_name':1}).sort({u'entities.user_mentions.screen_name':1}):
    print post

我得到的错误如下:

Traceback (most recent call last):
  File "find_ow.py", line 7, in <module>
    for post in db.posts.find({}, {'entities.user_mentions.screen_name':1}).sort({'entities.user_mentions.screen_name':1},1):
  File "/Library/Python/2.6/site-packages/pymongo-2.0.1-py2.6-macosx-10.6-universal.egg/pymongo/cursor.py", line 430, in sort
  File "/Library/Python/2.6/site-packages/pymongo-2.0.1-py2.6-macosx-10.6-universal.egg/pymongo/helpers.py", line 67, in _index_document
TypeError: first item in each key pair must be a string

我在其他地方找到了一个链接,该链接说如果使用pymongo,则需要在密钥的前面放置一个“ u”,但这也不起作用。任何其他人都可以使用它,或者这是一个错误。

I’m trying to use the sort feature when querying my mongoDB, but it is failing. The same query works in the MongoDB console but not here. Code is as follows:

import pymongo

from  pymongo import Connection
connection = Connection()
db = connection.myDB
print db.posts.count()
for post in db.posts.find({}, {'entities.user_mentions.screen_name':1}).sort({u'entities.user_mentions.screen_name':1}):
    print post

The error I get is as follows:

Traceback (most recent call last):
  File "find_ow.py", line 7, in <module>
    for post in db.posts.find({}, {'entities.user_mentions.screen_name':1}).sort({'entities.user_mentions.screen_name':1},1):
  File "/Library/Python/2.6/site-packages/pymongo-2.0.1-py2.6-macosx-10.6-universal.egg/pymongo/cursor.py", line 430, in sort
  File "/Library/Python/2.6/site-packages/pymongo-2.0.1-py2.6-macosx-10.6-universal.egg/pymongo/helpers.py", line 67, in _index_document
TypeError: first item in each key pair must be a string

I found a link elsewhere that says I need to place a ‘u’ infront of the key if using pymongo, but that didn’t work either. Anyone else get this to work or is this a bug.


回答 0

.sort()在pymongo中,以keydirection作为参数。

因此,假设您要排序,id那么您应该.sort("_id", 1)

对于多个字段:

.sort([("field1", pymongo.ASCENDING), ("field2", pymongo.DESCENDING)])

.sort(), in pymongo, takes key and direction as parameters.

So if you want to sort by, let’s say, id then you should .sort("_id", 1)

For multiple fields:

.sort([("field1", pymongo.ASCENDING), ("field2", pymongo.DESCENDING)])

回答 1

您可以尝试以下方法:

db.Account.find().sort("UserName")  
db.Account.find().sort("UserName",pymongo.ASCENDING)   
db.Account.find().sort("UserName",pymongo.DESCENDING)  

You can try this:

db.Account.find().sort("UserName")  
db.Account.find().sort("UserName",pymongo.ASCENDING)   
db.Account.find().sort("UserName",pymongo.DESCENDING)  

回答 2

这也适用:

db.Account.find().sort('UserName', -1)
db.Account.find().sort('UserName', 1)

我在代码中使用了它,如果我在这里做错了,请发表评论,谢谢。

This also works:

db.Account.find().sort('UserName', -1)
db.Account.find().sort('UserName', 1)

I’m using this in my code, please comment if i’m doing something wrong here, thanks.


回答 3

为什么python使用元组列表代替dict?

在python中,您不能保证字典将按照声明的顺序进行解释。

因此,在mongo shell中,您可以这样做.sort({'field1':1,'field2':1}),并且解释程序应在第一级对field1进行排序,并在第二级对field 2进行排序。

如果在Python中使用了此sintax,则有可能在第一级对field2进行排序。使用元组没有任何风险。

.sort([("field1",pymongo.ASCENDING), ("field2",pymongo.DESCENDING)])

Why python uses list of tuples instead dict?

In python, you cannot guarantee that the dictionary will be interpreted in the order you declared.

So, in mongo shell you could do .sort({'field1':1,'field2':1}) and the interpreter would sort field1 at first level and field 2 at second level.

If this syntax was used in python, there is a chance of sorting by field2 at first level. With tuple, there is no such risk.

.sort([("field1",pymongo.ASCENDING), ("field2",pymongo.DESCENDING)])

回答 4

.sort([("field1",pymongo.ASCENDING), ("field2",pymongo.DESCENDING)])

Python使用键方向。您可以使用上述方式。

因此,您可以这样做

for post in db.posts.find().sort('entities.user_mentions.screen_name',pymongo.ASCENDING):
        print post
.sort([("field1",pymongo.ASCENDING), ("field2",pymongo.DESCENDING)])

Python uses key,direction. You can use the above way.

So in your case you can do this

for post in db.posts.find().sort('entities.user_mentions.screen_name',pymongo.ASCENDING):
        print post

回答 5

TLDR:聚合管道比常规管道更快.find().sort()

现在转向真正的解释。在MongoDB中有两种执行排序操作的方法:

  1. 使用.find().sort()
  2. 或使用聚合管道。

正如许多.find()。sort()所建议的那样,这是执行排序的最简单方法。

.sort([("field1",pymongo.ASCENDING), ("field2",pymongo.DESCENDING)])

但是,与聚合管道相比,这是一个缓慢的过程。

谈到聚合管道方法。实现用于排序的简单聚合管道的步骤为:

  1. $ match(可选步骤)
  2. $ sort

注意:根据我的经验,聚合管道的工作速度比该.find().sort()方法快。

这是聚合管道的示例。

db.collection_name.aggregate([{
    "$match": {
        # your query - optional step
    }
},
{
    "$sort": {
        "field_1": pymongo.ASCENDING,
        "field_2": pymongo.DESCENDING,
        ....
    }
}])

自己尝试此方法,比较速度,然后在评论中让我知道。

编辑:不要忘记allowDiskUse=True在多个字段上排序时使用,否则会抛出错误。

TLDR: Aggregation pipeline is faster as compared to conventional .find().sort().

Now moving to the real explanation. There are two ways to perform sorting operations in MongoDB:

  1. Using .find() and .sort().
  2. Or using the aggregation pipeline.

As suggested by many .find().sort() is the simplest way to perform the sorting.

.sort([("field1",pymongo.ASCENDING), ("field2",pymongo.DESCENDING)])

However, this is a slow process compared to the aggregation pipeline.

Coming to the aggregation pipeline method. The steps to implement simple aggregation pipeline intended for sorting are:

  1. $match (optional step)
  2. $sort

NOTE: In my experience, the aggregation pipeline works a bit faster than the .find().sort() method.

Here’s an example of the aggregation pipeline.

db.collection_name.aggregate([{
    "$match": {
        # your query - optional step
    }
},
{
    "$sort": {
        "field_1": pymongo.ASCENDING,
        "field_2": pymongo.DESCENDING,
        ....
    }
}])

Try this method yourself, compare the speed and let me know about this in the comments.

Edit: Do not forget to use allowDiskUse=True while sorting on multiple fields otherwise it will throw an error.


回答 6

假设您要按“ created_on”字段进行排序,则可以这样做,

.sort('{}'.format('created_on'), 1 if sort_type == 'asc' else -1)

Say, you want to sort by ‘created_on’ field, then you can do like this,

.sort('{}'.format('created_on'), 1 if sort_type == 'asc' else -1)

mongodb:如果不存在则插入

问题:mongodb:如果不存在则插入

每天,我都会收到一堆文件(更新)。我想要做的是插入每个尚不存在的项目。

  • 我还想跟踪我第一次插入它们以及上次在更新中看到它们的情况。
  • 我不想有重复的文件。
  • 我不想删除以前已保存但不在我的更新中的文档。
  • 每天有95%(估计)的记录未修改。

我正在使用Python驱动程序(pymongo)。

我目前正在做的是(伪代码):

for each document in update:
      existing_document = collection.find_one(document)
      if not existing_document:
           document['insertion_date'] = now
      else:
           document = existing_document
      document['last_update_date'] = now
      my_collection.save(document)

我的问题是它非常慢(少于10万条记录需要40分钟,而我在更新中有数百万条记录)。我很确定有一些内置函数可以执行此操作,但是update()的文档是mmmhhh ….有点简洁….(http://www.mongodb.org/display/DOCS/Updating

有人可以建议如何更快地做到吗?

Every day, I receive a stock of documents (an update). What I want to do is insert each item that does not already exist.

  • I also want to keep track of the first time I inserted them, and the last time I saw them in an update.
  • I don’t want to have duplicate documents.
  • I don’t want to remove a document which has previously been saved, but is not in my update.
  • 95% (estimated) of the records are unmodified from day to day.

I am using the Python driver (pymongo).

What I currently do is (pseudo-code):

for each document in update:
      existing_document = collection.find_one(document)
      if not existing_document:
           document['insertion_date'] = now
      else:
           document = existing_document
      document['last_update_date'] = now
      my_collection.save(document)

My problem is that it is very slow (40 mins for less than 100 000 records, and I have millions of them in the update). I am pretty sure there is something builtin for doing this, but the document for update() is mmmhhh…. a bit terse…. (http://www.mongodb.org/display/DOCS/Updating )

Can someone advise how to do it faster?


回答 0

听起来您想执行“ upsert”。MongoDB对此具有内置支持。将一个额外的参数传递给您的update()调用:{upsert:true}。例如:

key = {'key':'value'}
data = {'key2':'value2', 'key3':'value3'};
coll.update(key, data, upsert=True); #In python upsert must be passed as a keyword argument

这将完全替换if-find-else-update块。如果密钥不存在,它将插入;如果密钥不存在,它将更新。

之前:

{"key":"value", "key2":"Ohai."}

后:

{"key":"value", "key2":"value2", "key3":"value3"}

您还可以指定要写入的数据:

data = {"$set":{"key2":"value2"}}

现在,您选择的文档将仅更新“ key2”的值,而其他所有内容保持不变。

Sounds like you want to do an “upsert”. MongoDB has built-in support for this. Pass an extra parameter to your update() call: {upsert:true}. For example:

key = {'key':'value'}
data = {'key2':'value2', 'key3':'value3'};
coll.update(key, data, upsert=True); #In python upsert must be passed as a keyword argument

This replaces your if-find-else-update block entirely. It will insert if the key doesn’t exist and will update if it does.

Before:

{"key":"value", "key2":"Ohai."}

After:

{"key":"value", "key2":"value2", "key3":"value3"}

You can also specify what data you want to write:

data = {"$set":{"key2":"value2"}}

Now your selected document will update the value of “key2” only and leave everything else untouched.


回答 1

从MongoDB 2.4开始,您可以使用$ setOnInsert(http://docs.mongodb.org/manual/reference/operator/setOnInsert/

在$ upsert命令中使用$ setOnInsert设置“插入日期”,并使用$ set设置“ last_update_date”。

要将您的伪代码变成一个可行的示例:

now = datetime.utcnow()
for document in update:
    collection.update_one(
        {"_id": document["_id"]},
        {
            "$setOnInsert": {"insertion_date": now},
            "$set": {"last_update_date": now},
        },
        upsert=True,
    )

As of MongoDB 2.4, you can use $setOnInsert (http://docs.mongodb.org/manual/reference/operator/setOnInsert/)

Set ‘insertion_date’ using $setOnInsert and ‘last_update_date’ using $set in your upsert command.

To turn your pseudocode into a working example:

now = datetime.utcnow()
for document in update:
    collection.update_one(
        {"_id": document["_id"]},
        {
            "$setOnInsert": {"insertion_date": now},
            "$set": {"last_update_date": now},
        },
        upsert=True,
    )

回答 2

您始终可以创建一个唯一索引,这将导致MongoDB拒绝有冲突的保存。考虑使用mongodb shell完成以下操作:

> db.getCollection("test").insert ({a:1, b:2, c:3})
> db.getCollection("test").find()
{ "_id" : ObjectId("50c8e35adde18a44f284e7ac"), "a" : 1, "b" : 2, "c" : 3 }
> db.getCollection("test").ensureIndex ({"a" : 1}, {unique: true})
> db.getCollection("test").insert({a:2, b:12, c:13})      # This works
> db.getCollection("test").insert({a:1, b:12, c:13})      # This fails
E11000 duplicate key error index: foo.test.$a_1  dup key: { : 1.0 }

You could always make a unique index, which causes MongoDB to reject a conflicting save. Consider the following done using the mongodb shell:

> db.getCollection("test").insert ({a:1, b:2, c:3})
> db.getCollection("test").find()
{ "_id" : ObjectId("50c8e35adde18a44f284e7ac"), "a" : 1, "b" : 2, "c" : 3 }
> db.getCollection("test").ensureIndex ({"a" : 1}, {unique: true})
> db.getCollection("test").insert({a:2, b:12, c:13})      # This works
> db.getCollection("test").insert({a:1, b:12, c:13})      # This fails
E11000 duplicate key error index: foo.test.$a_1  dup key: { : 1.0 }

回答 3

您可以将Upsert与$ setOnInsert运算符一起使用。

db.Table.update({noExist: true}, {"$setOnInsert": {xxxYourDocumentxxx}}, {upsert: true})

You may use Upsert with $setOnInsert operator.

db.Table.update({noExist: true}, {"$setOnInsert": {xxxYourDocumentxxx}}, {upsert: true})

回答 4

1.使用更新。

从上述Van Nguyen的答案得出的结论,请使用update而不是save。这使您可以访问upsert选项。

注意:发现时,此方法将覆盖整个文档(来自docs

var conditions = { name: 'borne' }   , update = { $inc: { visits: 1 }} , options = { multi: true };

Model.update(conditions, update, options, callback);

function callback (err, numAffected) {   // numAffected is the number of updated documents })

1.a. 使用$ set

如果要更新选择的文档而不是整个文档,则可以将$ set方法与update一起使用。(再次,来自docs)…因此,如果您要设置…

var query = { name: 'borne' };  Model.update(query, ***{ name: 'jason borne' }***, options, callback)

发送为…

Model.update(query, ***{ $set: { name: 'jason borne' }}***, options, callback)

这有助于防止使用意外覆盖您的所有文档{ name: 'jason borne' }

1. Use Update.

Drawing from Van Nguyen’s answer above, use update instead of save. This gives you access to the upsert option.

NOTE: This method overrides the entire document when found (From the docs)

var conditions = { name: 'borne' }   , update = { $inc: { visits: 1 }} , options = { multi: true };

Model.update(conditions, update, options, callback);

function callback (err, numAffected) {   // numAffected is the number of updated documents })

1.a. Use $set

If you want to update a selection of the document, but not the whole thing, you can use the $set method with update. (again, From the docs)… So, if you want to set…

var query = { name: 'borne' };  Model.update(query, ***{ name: 'jason borne' }***, options, callback)

Send it as…

Model.update(query, ***{ $set: { name: 'jason borne' }}***, options, callback)

This helps prevent accidentally overwriting all of your document(s) with { name: 'jason borne' }.


回答 5

摘要

  • 您已有一个记录集合。
  • 您有一组记录,其中包含对现有记录的更新。
  • 有些更新并不会真正更新任何内容,它们会复制您已经拥有的内容。
  • 所有更新都包含已经存在的相同字段,可能只是不同的值。
  • 您想跟踪记录的最后更改时间,值实际更改的位置。

注意,我假设是PyMongo,请更改为适合您选择的语言。

说明:

  1. 使用具有unique = true的索引创建集合,这样就不会得到重复的记录。

  2. 遍历您的输入记录,创建一批约15,000条记录。对于批处理中的每个记录,创建一个由要插入的数据组成的字典,并假设每个记录将成为新记录。将“创建的”和“更新的”时间戳添加到其中。将其作为带有“ ContinueOnError”标志= true的批处理插入命令发出,因此即使其中存在重复的键(听起来也将如此),也会插入其他所有内容。这将很快发生。大量插入岩石,我获得了每秒15k的性能水平。有关ContinueOnError的更多说明,请参见 http://docs.mongodb.org/manual/core/write-operations/

    记录插入发生得非常快,因此您将立即完成这些插入操作。现在,该更新相关记录了。批量检索可以做到这一点,比一次检索要快得多。

  3. 再次遍历所有输入记录,创建15K左右的批次。提取密钥(如果有一个密钥,则最好,但如果没有密钥,则无济于事)。使用db.collectionNameBlah.find({field:{$ in:[1,2,3 …})查询从Mongo检索这堆记录。对于每个记录,确定是否有更新,如果有,则发布更新,包括更新“已更新”的时间戳。

    不幸的是,我们应该注意,MongoDB 2.4及以下版本不包含批量更新操作。他们正在为此努力。

优化要点:

  • 刀片将极大地加快批量生产的速度。
  • 整体检索记录也会加快速度。
  • 个别更新是目前唯一可行的方法,但10Gen正在研究中。大概是2.6版本,尽管我不确定它是否会在那时完成,但是还有很多事情要做(我一直在遵循他们的Jira系统)。

Summary

  • You have an existing collection of records.
  • You have a set records that contain updates to the existing records.
  • Some of the updates don’t really update anything, they duplicate what you have already.
  • All updates contain the same fields that are there already, just possibly different values.
  • You want to track when a record was last changed, where a value actually changed.

Note, I’m presuming PyMongo, change to suit your language of choice.

Instructions:

  1. Create the collection with an index with unique=true so you don’t get duplicate records.

  2. Iterate over your input records, creating batches of them of 15,000 records or so. For each record in the batch, create a dict consisting of the data you want to insert, presuming each one is going to be a new record. Add the ‘created’ and ‘updated’ timestamps to these. Issue this as a batch insert command with the ‘ContinueOnError’ flag=true, so the insert of everything else happens even if there’s a duplicate key in there (which it sounds like there will be). THIS WILL HAPPEN VERY FAST. Bulk inserts rock, I’ve gotten 15k/second performance levels. Further notes on ContinueOnError, see http://docs.mongodb.org/manual/core/write-operations/

    Record inserts happen VERY fast, so you’ll be done with those inserts in no time. Now, it’s time to update the relevant records. Do this with a batch retrieval, much faster than one at a time.

  3. Iterate over all your input records again, creating batches of 15K or so. Extract out the keys (best if there’s one key, but can’t be helped if there isn’t). Retrieve this bunch of records from Mongo with a db.collectionNameBlah.find({ field : { $in : [ 1, 2,3 …}) query. For each of these records, determine if there’s an update, and if so, issue the update, including updating the ‘updated’ timestamp.

    Unfortunately, we should note, MongoDB 2.4 and below do NOT include a bulk update operation. They’re working on that.

Key Optimization Points:

  • The inserts will vastly speed up your operations in bulk.
  • Retrieving records en masse will speed things up, too.
  • Individual updates are the only possible route now, but 10Gen is working on it. Presumably, this will be in 2.6, though I’m not sure if it will be finished by then, there’s a lot of stuff to do (I’ve been following their Jira system).

回答 6

我认为mongodb不支持这种选择性的upserting。我有与LeMiz相同的问题,并且在处理“创建的”和“更新的”时间戳时,使用update(criteria,newObj,upsert,multi)无法正常工作。给出以下upsert语句:

update( { "name": "abc" }, 
        { $set: { "created": "2010-07-14 11:11:11", 
                  "updated": "2010-07-14 11:11:11" }},
        true, true ) 

方案#1-‘名称’为’abc’的文档不存在:使用’名称’=’abc’,’创建’= 2010-07-14 11:11:11和’已更新’=创建新文档2010-07-14 11:11:11。

方案#2-“名称”为“ abc”的文档已存在,且具有以下内容:“名称” =“ abc”,“创建的” = 2010-07-12 09:09:09和“更新的” = 2010-07 -13 10:10:10。更新之后,该文档现在将与方案1中的结果相同。无法在upsert中指定在插入时要设置的字段,在更新时要保留的字段。

我的解决方案是在critera字段上创建唯一索引,执行插入操作,然后立即在“ updated”字段上执行更新。

I don’t think mongodb supports this type of selective upserting. I have the same problem as LeMiz, and using update(criteria, newObj, upsert, multi) doesn’t work right when dealing with both a ‘created’ and ‘updated’ timestamp. Given the following upsert statement:

update( { "name": "abc" }, 
        { $set: { "created": "2010-07-14 11:11:11", 
                  "updated": "2010-07-14 11:11:11" }},
        true, true ) 

Scenario #1 – document with ‘name’ of ‘abc’ does not exist: New document is created with ‘name’ = ‘abc’, ‘created’ = 2010-07-14 11:11:11, and ‘updated’ = 2010-07-14 11:11:11.

Scenario #2 – document with ‘name’ of ‘abc’ already exists with the following: ‘name’ = ‘abc’, ‘created’ = 2010-07-12 09:09:09, and ‘updated’ = 2010-07-13 10:10:10. After the upsert, the document would now be the same as the result in scenario #1. There’s no way to specify in an upsert which fields be set if inserting, and which fields be left alone if updating.

My solution was to create a unique index on the critera fields, perform an insert, and immediately afterward perform an update just on the ‘updated’ field.


回答 7

通常,在MongoDB中使用update更好,因为如果尚不存在,它将仅创建文档,尽管我不确定如何使用python适配器。

其次,如果您只需要知道该文档是否存在,则只返回一个数字的count()会比find_one更好,因为后者可能会从MongoDB传输整个文档,从而导致不必要的流量。

In general, using update is better in MongoDB as it will just create the document if it doesn’t exist yet, though I’m not sure how to work that with your python adapter.

Second, if you only need to know whether or not that document exists, count() which returns only a number will be a better option than find_one which supposedly transfer the whole document from your MongoDB causing unnecessary traffic.


使用熊猫的“大数据”工作流程

问题:使用熊猫的“大数据”工作流程

在学习熊猫的过程中,我试图迷惑了这个问题很多月。我在日常工作中使用SAS,这非常有用,因为它提供了核心支持。但是,由于许多其他原因,SAS作为一个软件也是很糟糕的。

有一天,我希望用python和pandas取代我对SAS的使用,但是我目前缺少大型数据集的核心工作流程。我不是在谈论需要分布式网络的“大数据”,而是文件太大而无法容纳在内存中,但文件又足够小而无法容纳在硬盘上。

我的第一个想法是用来HDFStore将大型数据集保存在磁盘上,然后仅将需要的部分拉入数据帧中进行分析。其他人提到MongoDB是更易于使用的替代方案。我的问题是这样的:

什么是实现以下目标的最佳实践工作流:

  1. 将平面文件加载到永久的磁盘数据库结构中
  2. 查询该数据库以检索要输入到熊猫数据结构中的数据
  3. 处理熊猫中的片段后更新数据库

现实世界中的示例将不胜感激,尤其是那些从“大数据”中使用熊猫的人。

编辑-我希望如何工作的示例:

  1. 迭代地导入一个大的平面文件,并将其存储在永久的磁盘数据库结构中。这些文件通常太大而无法容纳在内存中。
  2. 为了使用Pandas,我想读取这些数据的子集(通常一次只读取几列),使其适合内存。
  3. 我将通过对所选列执行各种操作来创建新列。
  4. 然后,我将不得不将这些新列添加到数据库结构中。

我正在尝试找到执行这些步骤的最佳实践方法。阅读有关熊猫和pytables的链接,似乎添加一个新列可能是个问题。

编辑-专门回答杰夫的问题:

  1. 我正在建立消费者信用风险模型。数据类型包括电话,SSN和地址特征;财产价值;犯罪记录,破产等贬义信息。我每天使用的数据集平均有近1,000到2,000个字段,这些字段是混合数据类型:数字和字符数据的连续,名义和有序变量。我很少追加行,但是我确实执行许多创建新列的操作。
  2. 典型的操作涉及使用条件逻辑将几个列合并到一个新的复合列中。例如,if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'。这些操作的结果是数据集中每个记录的新列。
  3. 最后,我想将这些新列添加到磁盘数据结构中。我将重复步骤2,使用交叉表和描述性统计数据探索数据,以寻找有趣的直观关系进行建模。
  4. 一个典型的项目文件通常约为1GB。文件组织成这样的方式,其中一行包含消费者数据记录。每条记录的每一行都有相同的列数。情况总是如此。
  5. 创建新列时,我会按行进行子集化是非常罕见的。但是,在创建报告或生成描述性统计信息时,对行进行子集化是很常见的。例如,我可能想为特定业务创建一个简单的频率,例如零售信用卡。为此,除了我要报告的任何列之外,我将只选择那些业务线=零售的记录。但是,在创建新列时,我将拉出所有数据行,而仅提取操作所需的列。
  6. 建模过程要求我分析每一列,寻找与某些结果变量有关的有趣关系,并创建描述这些关系的新复合列。我探索的列通常以小集合形式完成。例如,我将集中讨论一组20个仅涉及属性值的列,并观察它们与贷款违约的关系。一旦探索了这些列并创建了新的列,我便转到另一组列,例如大学学历,并重复该过程。我正在做的是创建候选变量,这些变量解释我的数据和某些结果之间的关系。在此过程的最后,我应用了一些学习技术,这些技术可以根据这些复合列创建方程。

我很少向数据集添加行。我几乎总是会创建新列(统计/机器学习术语中的变量或功能)。

I have tried to puzzle out an answer to this question for many months while learning pandas. I use SAS for my day-to-day work and it is great for it’s out-of-core support. However, SAS is horrible as a piece of software for numerous other reasons.

One day I hope to replace my use of SAS with python and pandas, but I currently lack an out-of-core workflow for large datasets. I’m not talking about “big data” that requires a distributed network, but rather files too large to fit in memory but small enough to fit on a hard-drive.

My first thought is to use HDFStore to hold large datasets on disk and pull only the pieces I need into dataframes for analysis. Others have mentioned MongoDB as an easier to use alternative. My question is this:

What are some best-practice workflows for accomplishing the following:

  1. Loading flat files into a permanent, on-disk database structure
  2. Querying that database to retrieve data to feed into a pandas data structure
  3. Updating the database after manipulating pieces in pandas

Real-world examples would be much appreciated, especially from anyone who uses pandas on “large data”.

Edit — an example of how I would like this to work:

  1. Iteratively import a large flat-file and store it in a permanent, on-disk database structure. These files are typically too large to fit in memory.
  2. In order to use Pandas, I would like to read subsets of this data (usually just a few columns at a time) that can fit in memory.
  3. I would create new columns by performing various operations on the selected columns.
  4. I would then have to append these new columns into the database structure.

I am trying to find a best-practice way of performing these steps. Reading links about pandas and pytables it seems that appending a new column could be a problem.

Edit — Responding to Jeff’s questions specifically:

  1. I am building consumer credit risk models. The kinds of data include phone, SSN and address characteristics; property values; derogatory information like criminal records, bankruptcies, etc… The datasets I use every day have nearly 1,000 to 2,000 fields on average of mixed data types: continuous, nominal and ordinal variables of both numeric and character data. I rarely append rows, but I do perform many operations that create new columns.
  2. Typical operations involve combining several columns using conditional logic into a new, compound column. For example, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'. The result of these operations is a new column for every record in my dataset.
  3. Finally, I would like to append these new columns into the on-disk data structure. I would repeat step 2, exploring the data with crosstabs and descriptive statistics trying to find interesting, intuitive relationships to model.
  4. A typical project file is usually about 1GB. Files are organized into such a manner where a row consists of a record of consumer data. Each row has the same number of columns for every record. This will always be the case.
  5. It’s pretty rare that I would subset by rows when creating a new column. However, it’s pretty common for me to subset on rows when creating reports or generating descriptive statistics. For example, I might want to create a simple frequency for a specific line of business, say Retail credit cards. To do this, I would select only those records where the line of business = retail in addition to whichever columns I want to report on. When creating new columns, however, I would pull all rows of data and only the columns I need for the operations.
  6. The modeling process requires that I analyze every column, look for interesting relationships with some outcome variable, and create new compound columns that describe those relationships. The columns that I explore are usually done in small sets. For example, I will focus on a set of say 20 columns just dealing with property values and observe how they relate to defaulting on a loan. Once those are explored and new columns are created, I then move on to another group of columns, say college education, and repeat the process. What I’m doing is creating candidate variables that explain the relationship between my data and some outcome. At the very end of this process, I apply some learning techniques that create an equation out of those compound columns.

It is rare that I would ever add rows to the dataset. I will nearly always be creating new columns (variables or features in statistics/machine learning parlance).


回答 0

我通常以这种方式使用数十GB的数据,例如,我在磁盘上有一些表,这些表是通过查询读取,创建数据并追加回去的。

值得阅读文档以及该线程的后期内容,以获取有关如何存储数据的一些建议。

将影响您存储数据方式的详细信息,例如:
尽可能多地提供详细信息;我可以帮助您建立结构。

  1. 数据大小,行数,列数,列类型;您要追加行还是仅追加列?
  2. 典型的操作将是什么样的。例如,对列进行查询以选择一堆行和特定的列,然后执行一个操作(在内存中),创建新列并保存。
    (提供一个玩具示例可以使我们提供更具体的建议。)
  3. 处理完之后,您该怎么办?步骤2是临时的还是可重复的?
  4. 输入平面文件:大约总大小(以Gb为单位)。这些是如何组织的,例如通过记录?每个文件都包含不同的字段,还是每个文件都有一些记录,每个文件中都有所有字段?
  5. 您是否曾经根据条件选择行(记录)的子集(例如,选择字段A> 5的行)?然后执行某项操作,还是只选择包含所有记录的A,B,C字段(然后执行某项操作)?
  6. 您是否“工作”所有列(成组),还是只用于报告的比例很高(例如,您想保留数据,但无需明确地拉入该列,直到最终结果时间)?

确保至少0.10.1安装了熊猫

逐块读取迭代文件多个表查询

由于pytables已优化为按行操作(这是您要查询的内容),因此我们将为每组字段创建一个表。这样一来,很容易选择一小组字段(它将与一个大表一起使用,但是这样做更有效。我想我将来可能会解决此限制。这是更加直观):(
以下是伪代码。)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

读入文件并创建存储(基本上是做什么append_to_multiple):

for f in files:
   # read in the file, additional options may be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

现在,您已将所有表存储在文件中(实际上,您可以根据需要将它们存储在单独的文件中,您可能需要将文件名添加到group_map中,但这可能不是必需的)。

这是获取列并创建新列的方式:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

准备进行后期处理时:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

关于data_columns,实际上不需要定义任何 data_columns。它们使您可以根据列来子选择行。例如:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

在最后的报告生成阶段,它们可能对您来说最有趣(实际上,数据列与其他列是分开的,如果定义太多,这可能会影响效率)。

您可能还想:

  • 创建一个使用字段列表的函数,在groups_map中查找组,然后选择它们并连接结果,以便获得结果框架(本质上就是select_as_multiple所做的事情)。这样,结构对您将非常透明。
  • 在某些数据列上建立索引(使行子设置快得多)。
  • 启用压缩。

如有疑问,请告诉我!

I routinely use tens of gigabytes of data in just this fashion e.g. I have tables on disk that I read via queries, create data and append back.

It’s worth reading the docs and late in this thread for several suggestions for how to store your data.

Details which will affect how you store your data, like:
Give as much detail as you can; and I can help you develop a structure.

  1. Size of data, # of rows, columns, types of columns; are you appending rows, or just columns?
  2. What will typical operations look like. E.g. do a query on columns to select a bunch of rows and specific columns, then do an operation (in-memory), create new columns, save these.
    (Giving a toy example could enable us to offer more specific recommendations.)
  3. After that processing, then what do you do? Is step 2 ad hoc, or repeatable?
  4. Input flat files: how many, rough total size in Gb. How are these organized e.g. by records? Does each one contains different fields, or do they have some records per file with all of the fields in each file?
  5. Do you ever select subsets of rows (records) based on criteria (e.g. select the rows with field A > 5)? and then do something, or do you just select fields A, B, C with all of the records (and then do something)?
  6. Do you ‘work on’ all of your columns (in groups), or are there a good proportion that you may only use for reports (e.g. you want to keep the data around, but don’t need to pull in that column explicity until final results time)?

Solution

Ensure you have pandas at least 0.10.1 installed.

Read iterating files chunk-by-chunk and multiple table queries.

Since pytables is optimized to operate on row-wise (which is what you query on), we will create a table for each group of fields. This way it’s easy to select a small group of fields (which will work with a big table, but it’s more efficient to do it this way… I think I may be able to fix this limitation in the future… this is more intuitive anyhow):
(The following is pseudocode.)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

Reading in the files and creating the storage (essentially doing what append_to_multiple does):

for f in files:
   # read in the file, additional options may be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

Now you have all of the tables in the file (actually you could store them in separate files if you wish, you would prob have to add the filename to the group_map, but probably this isn’t necessary).

This is how you get columns and create new ones:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

When you are ready for post_processing:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

About data_columns, you don’t actually need to define ANY data_columns; they allow you to sub-select rows based on the column. E.g. something like:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

They may be most interesting to you in the final report generation stage (essentially a data column is segregated from other columns, which might impact efficiency somewhat if you define a lot).

You also might want to:

  • create a function which takes a list of fields, looks up the groups in the groups_map, then selects these and concatenates the results so you get the resulting frame (this is essentially what select_as_multiple does). This way the structure would be pretty transparent to you.
  • indexes on certain data columns (makes row-subsetting much faster).
  • enable compression.

Let me know when you have questions!


回答 1

我认为以上答案都缺少一种我发现非常有用的简单方法。

当我的文件太大而无法加载到内存中时,我将该文件分成多个较小的文件(按行或列)

示例:如果有30天的〜30GB大小的交易数据值得每天将其拆分为一个〜1GB大小的文件。随后,我分别处理每个文件,并在最后汇总结果

最大的优势之一是它允许并行处理文件(多个线程或多个进程)

另一个优点是文件操作(如示例中的添加/删除日期)可以通过常规的shell命令完成,而在更高级/更复杂的文件格式中则无法实现

这种方法无法涵盖所有​​情况,但在许多情况下非常有用

I think the answers above are missing a simple approach that I’ve found very useful.

When I have a file that is too large to load in memory, I break up the file into multiple smaller files (either by row or cols)

Example: In case of 30 days worth of trading data of ~30GB size, I break it into a file per day of ~1GB size. I subsequently process each file separately and aggregate results at the end

One of the biggest advantages is that it allows parallel processing of the files (either multiple threads or processes)

The other advantage is that file manipulation (like adding/removing dates in the example) can be accomplished by regular shell commands, which is not be possible in more advanced/complicated file formats

This approach doesn’t cover all scenarios, but is very useful in a lot of them


回答 2

问题提出两年后,现在出现了一个“核心外”熊猫:dask。太好了!尽管它不支持所有熊猫功能,但您可以真正做到这一点。

There is now, two years after the question, an ‘out-of-core’ pandas equivalent: dask. It is excellent! Though it does not support all of pandas functionality, you can get really far with it.


回答 3

如果您的数据集介于1到20GB之间,则应该获得具有48GB RAM的工作站。然后,熊猫可以将整个数据集保存在RAM中。我知道这不是您在这里寻找的答案,但是在具有4GB RAM的笔记本电脑上进行科学计算是不合理的。

If your datasets are between 1 and 20GB, you should get a workstation with 48GB of RAM. Then Pandas can hold the entire dataset in RAM. I know its not the answer you’re looking for here, but doing scientific computing on a notebook with 4GB of RAM isn’t reasonable.


回答 4

我知道这是一个旧线程,但是我认为Blaze库值得一试。它是针对此类情况而构建的。

从文档:

Blaze将NumPy和Pandas的可用性扩展到分布式和核外计算。Blaze提供了类似于NumPy ND-Array或Pandas DataFrame的接口,但是将这些熟悉的接口映射到了其他各种计算引擎上,例如Postgres或Spark。

编辑:顺便说一下,它由ContinuumIO和NumPy的作者Travis Oliphant支持。

I know this is an old thread but I think the Blaze library is worth checking out. It’s built for these types of situations.

From the docs:

Blaze extends the usability of NumPy and Pandas to distributed and out-of-core computing. Blaze provides an interface similar to that of the NumPy ND-Array or Pandas DataFrame but maps these familiar interfaces onto a variety of other computational engines like Postgres or Spark.

Edit: By the way, it’s supported by ContinuumIO and Travis Oliphant, author of NumPy.


回答 5

pymongo就是这种情况。我还使用python中的sql server,sqlite,HDF,ORM(SQLAlchemy)进行了原型设计。首要的pymongo是基于文档的数据库,因此每个人都是(dict具有属性的)文档。很多人组成一个集合,您可以有很多集合(人,股票市场,收入)。

pd.dateframe-> pymongo注意:我使用chunksizein read_csv使其保持5到10k记录(如果较大,pymongo会删除套接字)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

查询:gt =大于…

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find()返回一个迭代器,因此我通常将ichunked其切成更小的迭代器。

由于我通常可以将10个数据源粘贴在一起,因此如何进行联接:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

然后(就我而言,有时我必须aJoinDF先进行“可合并”操作)。

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

然后,您可以通过下面的update方法将新信息写入您的主要收藏夹。(逻辑收集与物理数据源)。

collection.update({primarykey:foo},{key:change})

在较小的查询中,只需进行非规范化即可。例如,您在文档中有代码,而您仅添加域代码文本并在dict创建文档时进行查找。

现在,您有了一个基于人的漂亮数据集,您可以在每种情况下释放自己的逻辑并添加更多属性。最后,您可以将3个最大记忆键指标读入大熊猫,并进行数据透视/汇总/数据探索。这对我来说适合300万条带有数字/大文本/类别/代码/浮点数/ …的记录

您还可以使用MongoDB内置的两种方法(MapReduce和聚合框架)。有关聚合框架的更多信息,请参见此处,因为它似乎比MapReduce容易,并且看起来便于进行快速聚合工作。注意,我不需要定义字段或关系,可以将项目添加到文档中。在快速变化的numpy,pandas,python工具集的当前状态下,MongoDB可以帮助我开始工作:)

This is the case for pymongo. I have also prototyped using sql server, sqlite, HDF, ORM (SQLAlchemy) in python. First and foremost pymongo is a document based DB, so each person would be a document (dict of attributes). Many people form a collection and you can have many collections (people, stock market, income).

pd.dateframe -> pymongo Note: I use the chunksize in read_csv to keep it to 5 to 10k records(pymongo drops the socket if larger)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

querying: gt = greater than…

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find() returns an iterator so I commonly use ichunked to chop into smaller iterators.

How about a join since I normally get 10 data sources to paste together:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

then (in my case sometimes I have to agg on aJoinDF first before its “mergeable”.)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

And you can then write the new info to your main collection via the update method below. (logical collection vs physical datasources).

collection.update({primarykey:foo},{key:change})

On smaller lookups, just denormalize. For example, you have code in the document and you just add the field code text and do a dict lookup as you create documents.

Now you have a nice dataset based around a person, you can unleash your logic on each case and make more attributes. Finally you can read into pandas your 3 to memory max key indicators and do pivots/agg/data exploration. This works for me for 3 million records with numbers/big text/categories/codes/floats/…

You can also use the two methods built into MongoDB (MapReduce and aggregate framework). See here for more info about the aggregate framework, as it seems to be easier than MapReduce and looks handy for quick aggregate work. Notice I didn’t need to define my fields or relations, and I can add items to a document. At the current state of the rapidly changing numpy, pandas, python toolset, MongoDB helps me just get to work :)


回答 6

我发现这有点晚了,但我遇到了类似的问题(抵押预付款模型)。我的解决方案是跳过熊猫HDFStore层,并使用直接pytables。我将每列保存为最终文件中的单独HDF5阵列。

我的基本工作流程是首先从数据库中获取CSV文件。我用gzip压缩,所以它没有那么大。然后,通过在python中对其进行迭代,将每一行转换为实际数据类型并将其写入HDF5文件,将其转换为面向行的HDF5文件。这花费了数十分钟,但是它不使用任何内存,因为它只是逐行地操作。然后,我将面向行的HDF5文件“转置”为面向列的HDF5文件。

表转置如下:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

然后读回它就像:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

现在,我通常在具有大量内存的计算机上运行此程序,因此我可能对内存使用情况不够谨慎。例如,默认情况下,装入操作将读取整个数据集。

这通常对我有用,但是有点笨拙,我不能使用花式的pytables魔术。

编辑:与默认的记录数组pytables相比,此方法的真正优势在于,我可以使用无法处理表的h5r将数据加载到R中。或者,至少,我无法使其加载异类表。

I spotted this a little late, but I work with a similar problem (mortgage prepayment models). My solution has been to skip the pandas HDFStore layer and use straight pytables. I save each column as an individual HDF5 array in my final file.

My basic workflow is to first get a CSV file from the database. I gzip it, so it’s not as huge. Then I convert that to a row-oriented HDF5 file, by iterating over it in python, converting each row to a real data type, and writing it to a HDF5 file. That takes some tens of minutes, but it doesn’t use any memory, since it’s only operating row-by-row. Then I “transpose” the row-oriented HDF5 file into a column-oriented HDF5 file.

The table transpose looks like:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

Reading it back in then looks like:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

Now, I generally run this on a machine with a ton of memory, so I may not be careful enough with my memory usage. For example, by default the load operation reads the whole data set.

This generally works for me, but it’s a bit clunky, and I can’t use the fancy pytables magic.

Edit: The real advantage of this approach, over the array-of-records pytables default, is that I can then load the data into R using h5r, which can’t handle tables. Or, at least, I’ve been unable to get it to load heterogeneous tables.


回答 7

我发现对大型数据用例有用的一个技巧是通过将浮点精度降低到32位来减少数据量。它并非在所有情况下都适用,但是在许多应用程序中,64位精度过高,并且节省2倍的内存值得。提出一个显而易见的观点:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

One trick I found helpful for large data use cases is to reduce the volume of the data by reducing float precision to 32-bit. It’s not applicable in all cases, but in many applications 64-bit precision is overkill and the 2x memory savings are worth it. To make an obvious point even more obvious:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

回答 8

正如其他人所指出的,若干年后的“外的核心”大熊猫相当于已经出现:DASK。尽管dask并不是熊猫及其所有功能的直接替代品,但它出于以下几个原因而脱颖而出:

Dask是一个灵活的用于分析计算的并行计算库,针对动态任务调度进行了优化,以针对“大数据”集合(如并行数组,数据框和列表)的交互式计算工作负载进行动态任务调度,这些列表将诸如NumPy,Pandas或Python迭代器之类的通用接口扩展为更大的-非内存或分布式环境,并可以从便携式计算机扩展到群集。

达斯克强调以下优点:

  • 熟悉:提供并行的NumPy数组和Pandas DataFrame对象
  • 灵活:提供任务调度界面,用于更多自定义工作负载并与其他项目集成。
  • 本机:通过访问PyData堆栈,在Pure Python中启用分布式计算。
  • 快速:以低开销,低延迟和快速数值算法所需的最少序列化操作
  • 扩大规模:在具有1000个核心的集群上灵活运行缩小规模:在单个过程中轻松设置并在笔记本电脑上运行
  • 响应式:设计时考虑了交互式计算,可提供快速反馈和诊断以帮助人类

并添加一个简单的代码示例:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

替换这样的一些熊猫代码:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

并且特别值得注意的是,通过该concurrent.futures界面提供了用于提交自定义任务的通用基础架构:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

As noted by others, after some years an ‘out-of-core’ pandas equivalent has emerged: dask. Though dask is not a drop-in replacement of pandas and all of its functionality it stands out for several reasons:

Dask is a flexible parallel computing library for analytic computing that is optimized for dynamic task scheduling for interactive computational workloads of “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments and scales from laptops to clusters.

Dask emphasizes the following virtues:

  • Familiar: Provides parallelized NumPy array and Pandas DataFrame objects
  • Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
  • Native: Enables distributed computing in Pure Python with access to the PyData stack.
  • Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
  • Scales up: Runs resiliently on clusters with 1000s of cores Scales down: Trivial to set up and run on a laptop in a single process
  • Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans

and to add a simple code sample:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

replaces some pandas code like this:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

and, especially noteworthy, provides through the concurrent.futures interface a general infrastructure for the submission of custom tasks:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

回答 9

在这里还值得一提的是Ray
它是一个分布式计算框架,它以分布式方式自己实现了对熊猫的实现。

只需替换pandas导入,代码应该可以正常运行:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual

可以在这里阅读更多详细信息:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/

It is worth mentioning here Ray as well,
it’s a distributed computation framework, that has it’s own implementation for pandas in a distributed way.

Just replace the pandas import, and the code should work as is:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual

can read more details here:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/


回答 10

另一种变化

在熊猫中完成的许多操作也可以作为db查询来完成(sql,mongo)

使用RDBMS或mongodb,您可以在数据库查询中执行某些聚合(针对大型数据进行了优化,并有效地使用了缓存和索引)

以后,您可以使用熊猫进行后期处理。

这种方法的优点是,您可以在处理大型数据时获得数据库优化,同时仍可以使用高级声明性语法定义逻辑-无需处理决定在内存中做什么和做什么的细节。的核心。

尽管查询语言和熊猫语言不同,但是将部分逻辑从一个逻辑转换到另一个逻辑通常并不复杂。

One more variation

Many of the operations done in pandas can also be done as a db query (sql, mongo)

Using a RDBMS or mongodb allows you to perform some of the aggregations in the DB Query (which is optimized for large data, and uses cache and indexes efficiently)

Later, you can perform post processing using pandas.

The advantage of this method is that you gain the DB optimizations for working with large data, while still defining the logic in a high level declarative syntax – and not having to deal with the details of deciding what to do in memory and what to do out of core.

And although the query language and pandas are different, it’s usually not complicated to translate part of the logic from one to another.


回答 11

如果您走创建数据管道的简单路径,请将该路径分解为多个较小的文件,请考虑使用Ruffus

Consider Ruffus if you go the simple path of creating a data pipeline which is broken down into multiple smaller files.


回答 12

我最近遇到了类似的问题。我发现简单地读取数据并将数据块追加到同一csv时效果很好。我的问题是,使用某些列的值,根据另一张表中的信息添加日期列。这可能会帮助那些对dask和hdf5感到困惑的人,但更熟悉像我这样的熊猫。

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)

I recently came across a similar issue. I found simply reading the data in chunks and appending it as I write it in chunks to the same csv works well. My problem was adding a date column based on information in another table, using the value of certain columns as follows. This may help those confused by dask and hdf5 but more familiar with pandas like myself.

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)

回答 13

我想指出一下Vaex软件包。

Vaex是用于惰性核心数据框架(类似于Pandas)的python库,用于可视化和探索大型表格数据集。它可以在高达每秒十亿(10 9)个对象/行的N维网格上计算统计信息,例如平均值,总和,计数,标准差等。可视化使用直方图,密度图和3d体积渲染完成,从而可以交互式探索大数据。Vaex使用内存映射,零内存复制策略和惰性计算来获得最佳性能(不浪费内存)。

看一下文档:https : //vaex.readthedocs.io/en/latest/ 该API非常接近于熊猫API。

I’d like to point out the Vaex package.

Vaex is a python library for lazy Out-of-Core DataFrames (similar to Pandas), to visualize and explore big tabular datasets. It can calculate statistics such as mean, sum, count, standard deviation etc, on an N-dimensional grid up to a billion (109) objects/rows per second. Visualization is done using histograms, density plots and 3d volume rendering, allowing interactive exploration of big data. Vaex uses memory mapping, zero memory copy policy and lazy computations for best performance (no memory wasted).

Have a look at the documentation: https://vaex.readthedocs.io/en/latest/ The API is very close to the API of pandas.


回答 14

为什么选择熊猫?您是否尝试过标准Python

使用标准库python。即使最近发布了稳定版,Pandas也会经常更新。

使用标准的python库,您的代码将始终运行。

一种实现方法是对要存储数据的方式有所了解,并对数据要解决哪些问题。然后绘制一个模式,说明如何组织数据(思考表),这将有助于您查询数据,而不必进行规范化。

您可以充分利用:

  • 字典列表,用于将数据存储在内存中,一个字典为一行,
  • 生成器逐行处理数据,以免RAM溢出,
  • 列出理解以查询您的数据,
  • 利用Counter,DefaultDict,…
  • 使用您选择的任何存储解决方案将数据存储在硬盘上,json可能是其中之一。

随着时间的推移,Ram和HDD越来越便宜,并且标准python 3广泛可用且稳定。

Why Pandas ? Have you tried Standard Python ?

The use of standard library python. Pandas is subject to frequent updates, even with the recent release of the stable version.

Using the standard python library your code will always run.

One way of doing it is to have an idea of the way you want your data to be stored , and which questions you want to solve regarding the data. Then draw a schema of how you can organise your data (think tables) that will help you query the data, not necessarily normalisation.

You can make good use of :

  • list of dictionaries to store the data in memory, one dict being one row,
  • generators to process the data row after row to not overflow your RAM,
  • list comprehension to query your data,
  • make use of Counter, DefaultDict, …
  • store your data on your hard drive using whatever storing solution you have chosen, json could be one of them.

Ram and HDD is becoming cheaper and cheaper with time and standard python 3 is widely available and stable.


回答 15

目前,我正在“喜欢”您,只是规模较小,这就是为什么我没有PoC来建议的原因。

但是,我似乎在使用pickle作为缓存系统并将各种功能的执行外包到文件中找到了成功-从我的commando / main文件中执行这些文件。例如,我使用prepare_use.py转换对象类型,将数据集拆分为测试,验证和预测数据集。

用咸菜进行缓存如何工作?我使用字符串来访问动态创建的pickle文件,具体取决于传递了哪些参数和数据集(为此,我尝试捕获并确定程序是否已在运行,使用.shape表示数据集,使用dict表示通过参数)。尊重这些措施,我得到一个String来尝试查找和读取.pickle文件,并且如果找到了该字符串,则可以跳过处理时间以跳转到我现在正在处理的执行。

使用数据库时,我遇到了类似的问题,这就是为什么我在使用此解决方案时感到高兴的原因,但是-有很多限制-例如由于冗余而存储大量的泡菜集。可以使用正确的索引从转换前到更新表进行更新-验证信息可以打开另一本完整的书(我尝试合并爬网的租金数据,基本上在2小时后停止使用数据库-因为我想在之后跳回每个转换过程)

我希望我的2美分能以某种方式对您有所帮助。

问候。

At the moment I am working “like” you, just on a lower scale, which is why I don’t have a PoC for my suggestion.

However, I seem to find success in using pickle as caching system and outsourcing execution of various functions into files – executing these files from my commando / main file; For example i use a prepare_use.py to convert object types, split a data set into test, validating and prediction data set.

How does your caching with pickle work? I use strings in order to access pickle-files that are dynamically created, depending on which parameters and data sets were passed (with that i try to capture and determine if the program was already run, using .shape for data set, dict for passed parameters). Respecting these measures, i get a String to try to find and read a .pickle-file and can, if found, skip processing time in order to jump to the execution i am working on right now.

Using databases I encountered similar problems, which is why i found joy in using this solution, however – there are many constraints for sure – for example storing huge pickle sets due to redundancy. Updating a table from before to after a transformation can be done with proper indexing – validating information opens up a whole other book (I tried consolidating crawled rent data and stopped using a database after 2 hours basically – as I would have liked to jump back after every transformation process)

I hope my 2 cents help you in some way.

Greetings.