我想測驗將檔案上傳到 S3 存盤桶。此存檔已解壓縮,因此我也想測驗提取的 JSON 檔案。這是我想要測驗的內容:
def get_files_from_s3(bucket_name, s3_prefix):
files = []
s3_resource = boto3.resource("s3")
bucket = s3_resource.Bucket(bucket_name)
response = bucket.objects.filter(Prefix=s3_prefix)
for obj in response:
if obj.key.endswidth('.zip'):
buffer = BytesIO(obj.get()["Body"].read())
print("Before unzipping the file")
z = ZipFile(buffer)
for filename in z.namelist():
print("After unzipping the file")
if filename == "files/file1.json":
for dict in self.parse_json(z, filename):
# store some data from the json
data.append(...)
return data
def parse_json(obj, file):
with obj.open(file, "r") as f:
data = json.loads(f.read())
return data["some_key"]
這里就是我試過在單元測驗中得到答案后在這里我剛才的問題:
from moto import mock_s3
@pytest.fixture(scope='function')
def aws_credentials():
"""Mocked AWS Credentials, to ensure we're not touching AWS directly"""
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_SESSION_TOKEN'] = 'testing'
@mock_s3
def test_get_files_from_s3(self, aws_credentials):
s3 = boto3.resource('s3')
bucket = s3.Bucket(self.bucket_name)
# Create the bucket first, as we're interacting with an empty mocked 'AWS account'
bucket.create()
# Create some example files that are representative of what the S3 bucket would look like in production
client = boto3.client('s3')
client.put_object(Bucket=self.bucket_name, Key=s3_prefix "file.zip", Body=open('local_file.zip', 'r'))
client.put_object(Bucket=self.bucket_name, Key=s3_prefix "file.nonzip", Body="...")
# Retrieve the files again using whatever logic
files = module.get_files_from_s3(BUCKET_NAME, S3_PREFIX)
self.assertEqual(['file.zip'], files)
所以我在這里關心的是如何在傳遞給 moto 時模擬 ZipFile 物件?因為當我Body用我想發送到 S3 客戶端的實際 zi??p 檔案傳遞引數時,我發現了這個錯誤:E UnicodeDecodeError: 'utf-8' codec can't decode byte 0x9b in position 10: invalid start byte
另一個問題是如何將該物件傳遞給另一個函式,例如parse_json()?
謝謝
uj5u.com熱心網友回復:
所述Body-parameter只接受位元組,這意味著open-method需要回傳使用的檔案的位元組內容rb的引數:
client.put_object(Bucket=self.bucket_name, Key=s3_prefix "file.zip", Body=open('local_file.zip', 'rb'))
請注意,這不是 Zipfile/mocking/moto 問題,而是 boto3 限制。
如果第二個引數只是r,則內容將作為字串回傳。嘗試上傳時,AWS 將失敗并顯示以下錯誤訊息:TypeError: Unicode-objects must be encoded before hashing
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/374052.html
標籤:Python 单元测试 亚马逊-s3 嘲笑 boto3
下一篇:為這個S3函式撰寫單元測驗
