将Dataframe保存到csv直接保存到s3 Python

问题:将Dataframe保存到csv直接保存到s3 Python

我有一个要上传到新CSV文件的pandas DataFrame。问题是在将文件传输到s3之前,我不想在本地保存文件。是否有像to_csv这样的方法可以将数据帧直接写入s3?我正在使用boto3。
这是我到目前为止的内容:

import boto3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
read_file = s3.get_object(Bucket, Key)
df = pd.read_csv(read_file['Body'])

# Make alterations to DataFrame

# Then export DataFrame to CSV through direct transfer to s3

I have a pandas DataFrame that I want to upload to a new CSV file. The problem is that I don’t want to save the file locally before transferring it to s3. Is there any method like to_csv for writing the dataframe to s3 directly? I am using boto3.
Here is what I have so far:

import boto3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
read_file = s3.get_object(Bucket, Key)
df = pd.read_csv(read_file['Body'])

# Make alterations to DataFrame

# Then export DataFrame to CSV through direct transfer to s3

回答 0

您可以使用:

from io import StringIO # python3; python2: BytesIO 
import boto3

bucket = 'my_bucket_name' # already created on S3
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())

You can use:

from io import StringIO # python3; python2: BytesIO 
import boto3

bucket = 'my_bucket_name' # already created on S3
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())

回答 1

您可以直接使用S3路径。我正在使用Pandas 0.24.1

In [1]: import pandas as pd

In [2]: df = pd.DataFrame( [ [1, 1, 1], [2, 2, 2] ], columns=['a', 'b', 'c'])

In [3]: df
Out[3]:
   a  b  c
0  1  1  1
1  2  2  2

In [4]: df.to_csv('s3://experimental/playground/temp_csv/dummy.csv', index=False)

In [5]: pd.__version__
Out[5]: '0.24.1'

In [6]: new_df = pd.read_csv('s3://experimental/playground/temp_csv/dummy.csv')

In [7]: new_df
Out[7]:
   a  b  c
0  1  1  1
1  2  2  2

发行公告:

S3文件处理

熊猫现在使用s3fs处理S3连接。这不应破坏任何代码。但是,由于s3fs不是必需的依赖项,因此您将需要单独安装它,例如以前版本的panda中的boto。GH11915

You can directly use the S3 path. I am using Pandas 0.24.1

In [1]: import pandas as pd

In [2]: df = pd.DataFrame( [ [1, 1, 1], [2, 2, 2] ], columns=['a', 'b', 'c'])

In [3]: df
Out[3]:
   a  b  c
0  1  1  1
1  2  2  2

In [4]: df.to_csv('s3://experimental/playground/temp_csv/dummy.csv', index=False)

In [5]: pd.__version__
Out[5]: '0.24.1'

In [6]: new_df = pd.read_csv('s3://experimental/playground/temp_csv/dummy.csv')

In [7]: new_df
Out[7]:
   a  b  c
0  1  1  1
1  2  2  2

Release Note:

S3 File Handling

pandas now uses s3fs for handling S3 connections. This shouldn’t break any code. However, since s3fs is not a required dependency, you will need to install it separately, like boto in prior versions of pandas. GH11915.


回答 2

我喜欢s3fs,它使您可以像本地文件系统一样(几乎)使用s3。

你可以这样做:

import s3fs

bytes_to_write = df.to_csv(None).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open('s3://bucket/path/to/file.csv', 'wb') as f:
    f.write(bytes_to_write)

s3fs只支持rbwb打开文件,这就是为什么我做这个模式bytes_to_write的东西。

I like s3fs which lets you use s3 (almost) like a local filesystem.

You can do this:

import s3fs

bytes_to_write = df.to_csv(None).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open('s3://bucket/path/to/file.csv', 'wb') as f:
    f.write(bytes_to_write)

s3fs supports only rb and wb modes of opening the file, that’s why I did this bytes_to_write stuff.


回答 3

这是最新的答案:

import s3fs

s3 = s3fs.S3FileSystem(anon=False)

# Use 'w' for py3, 'wb' for py2
with s3.open('<bucket-name>/<filename>.csv','w') as f:
    df.to_csv(f)

StringIO的问题在于它将吞噬您的内存。使用此方法,您将文件流式传输到s3,而不是将其转换为字符串,然后将其写入s3。将pandas数据框及其字符串副本保存在内存中似乎效率很低。

如果您在ec2 Instant中工作,则可以为其赋予IAM角色以使其能够写入s3,因此您无需直接传递凭据。但是,您也可以通过将凭据传递给S3FileSystem()功能来连接到存储桶。请参阅文档:https : //s3fs.readthedocs.io/en/latest/

This is a more up to date answer:

import s3fs

s3 = s3fs.S3FileSystem(anon=False)

# Use 'w' for py3, 'wb' for py2
with s3.open('<bucket-name>/<filename>.csv','w') as f:
    df.to_csv(f)

The problem with StringIO is that it will eat away at your memory. With this method, you are streaming the file to s3, rather than converting it to string, then writing it into s3. Holding the pandas dataframe and its string copy in memory seems very inefficient.

If you are working in an ec2 instant, you can give it an IAM role to enable writing it to s3, thus you dont need to pass in credentials directly. However, you can also connect to a bucket by passing credentials to the S3FileSystem() function. See documention:https://s3fs.readthedocs.io/en/latest/


回答 4

如果None将第一个参数传递to_csv()给数据,则将以字符串形式返回。从那里开始,只需一步即可将其上传到S3。

也可以将一个StringIO对象传递给to_csv(),但是使用字符串会更容易。

If you pass None as the first argument to to_csv() the data will be returned as a string. From there it’s an easy step to upload that to S3 in one go.

It should also be possible to pass a StringIO object to to_csv(), but using a string will be easier.


回答 5

您还可以使用AWS Data Wrangler

import awswrangler

session = awswrangler.Session()
session.pandas.to_csv(
    dataframe=df,
    path="s3://...",
)

请注意,由于它是并行上传的,因此它将分为几部分。

You can also use the AWS Data Wrangler:

import awswrangler as wr
    
wr.s3.to_csv(
    df=df,
    path="s3://...",
)

Note that it will handle multipart upload for you to make the upload faster.


回答 6

我发现也可以使用client,而不仅仅是resource

from io import StringIO
import boto3
s3 = boto3.client("s3",\
                  region_name=region_name,\
                  aws_access_key_id=aws_access_key_id,\
                  aws_secret_access_key=aws_secret_access_key)
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key='path/test.csv')

I found this can be done using client also and not just resource.

from io import StringIO
import boto3
s3 = boto3.client("s3",\
                  region_name=region_name,\
                  aws_access_key_id=aws_access_key_id,\
                  aws_secret_access_key=aws_secret_access_key)
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key='path/test.csv')

回答 7

由于您正在使用boto3.client(),请尝试:

import boto3
from io import StringIO #python3 
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
def copy_to_s3(client, df, bucket, filepath):
    csv_buf = StringIO()
    df.to_csv(csv_buf, header=True, index=False)
    csv_buf.seek(0)
    client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
    print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')

copy_to_s3(client=s3, df=df_to_upload, bucket='abc', filepath='def/test.csv')

since you are using boto3.client(), try:

import boto3
from io import StringIO #python3 
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
def copy_to_s3(client, df, bucket, filepath):
    csv_buf = StringIO()
    df.to_csv(csv_buf, header=True, index=False)
    csv_buf.seek(0)
    client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
    print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')

copy_to_s3(client=s3, df=df_to_upload, bucket='abc', filepath='def/test.csv')

回答 8

我找到了一个似乎很有效的简单解决方案:

s3 = boto3.client("s3")

s3.put_object(
    Body=open("filename.csv").read(),
    Bucket="your-bucket",
    Key="your-key"
)

希望能有所帮助!

I found a very simple solution that seems to be working :

s3 = boto3.client("s3")

s3.put_object(
    Body=open("filename.csv").read(),
    Bucket="your-bucket",
    Key="your-key"
)

Hope that helps !


回答 9

我从存储桶s3中读取了两列的csv,并将文件csv的内容放入了pandas数据框。

例:

config.json

{
  "credential": {
    "access_key":"xxxxxx",
    "secret_key":"xxxxxx"
}
,
"s3":{
       "bucket":"mybucket",
       "key":"csv/user.csv"
   }
}

cls_config.json

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import json

class cls_config(object):

    def __init__(self,filename):

        self.filename = filename


    def getConfig(self):

        fileName = os.path.join(os.path.dirname(__file__), self.filename)
        with open(fileName) as f:
        config = json.load(f)
        return config

cls_pandas.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
import io

class cls_pandas(object):

    def __init__(self):
        pass

    def read(self,stream):

        df = pd.read_csv(io.StringIO(stream), sep = ",")
        return df

cls_s3.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import boto3
import json

class cls_s3(object):

    def  __init__(self,access_key,secret_key):

        self.s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

    def getObject(self,bucket,key):

        read_file = self.s3.get_object(Bucket=bucket, Key=key)
        body = read_file['Body'].read().decode('utf-8')
        return body

test.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from cls_config import *
from cls_s3 import *
from cls_pandas import *

class test(object):

    def __init__(self):
        self.conf = cls_config('config.json')

    def process(self):

        conf = self.conf.getConfig()

        bucket = conf['s3']['bucket']
        key = conf['s3']['key']

        access_key = conf['credential']['access_key']
        secret_key = conf['credential']['secret_key']

        s3 = cls_s3(access_key,secret_key)
        ob = s3.getObject(bucket,key)

        pa = cls_pandas()
        df = pa.read(ob)

        print df

if __name__ == '__main__':
    test = test()
    test.process()

I read a csv with two columns from bucket s3, and the content of the file csv i put in pandas dataframe.

Example:

config.json

{
  "credential": {
    "access_key":"xxxxxx",
    "secret_key":"xxxxxx"
}
,
"s3":{
       "bucket":"mybucket",
       "key":"csv/user.csv"
   }
}

cls_config.json

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import json

class cls_config(object):

    def __init__(self,filename):

        self.filename = filename


    def getConfig(self):

        fileName = os.path.join(os.path.dirname(__file__), self.filename)
        with open(fileName) as f:
        config = json.load(f)
        return config

cls_pandas.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
import io

class cls_pandas(object):

    def __init__(self):
        pass

    def read(self,stream):

        df = pd.read_csv(io.StringIO(stream), sep = ",")
        return df

cls_s3.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import boto3
import json

class cls_s3(object):

    def  __init__(self,access_key,secret_key):

        self.s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

    def getObject(self,bucket,key):

        read_file = self.s3.get_object(Bucket=bucket, Key=key)
        body = read_file['Body'].read().decode('utf-8')
        return body

test.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from cls_config import *
from cls_s3 import *
from cls_pandas import *

class test(object):

    def __init__(self):
        self.conf = cls_config('config.json')

    def process(self):

        conf = self.conf.getConfig()

        bucket = conf['s3']['bucket']
        key = conf['s3']['key']

        access_key = conf['credential']['access_key']
        secret_key = conf['credential']['secret_key']

        s3 = cls_s3(access_key,secret_key)
        ob = s3.getObject(bucket,key)

        pa = cls_pandas()
        df = pa.read(ob)

        print df

if __name__ == '__main__':
    test = test()
    test.process()