最近,我面臨通過 SFTP 將大檔案(> 0.5 GB)傳輸到 S3 存盤的任務。正如我從 boto3 手冊中了解到的,我應該使用分段上傳。我遇到了一個很好的教程,并進行了一些小的更改,為自己嘗試了代碼。所以,長話短說 - 它有效,但速度很荒謬(~150 kb / s),在我的情況下(大檔案)尤其受到傷害。據我所知,paramiko 可能是一個瓶頸,但我對預取和不同緩沖區大小的實驗并沒有太大意義(我得到的最大結果是第一個塊(預取打開)大約 1500 kb/s,然后下拉到250-300 kb/s 用于 ~1.5 gb 檔案上的其他塊)。因此,由于我的想法用完了,非常感謝您對下一步嘗試的任何提示和想法,提前致謝!這是我的代碼:
AWS 會話包裝器
aws.py
import boto3
import os
def aws_session(region_name='us-east-1'):
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_ACCESS_KEY_SECRET = os.environ.get('AWS_ACCESS_KEY_SECRET')
return boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_ACCESS_KEY_SECRET,
region_name=region_name)
主腳本
流檔案
import paramiko
import math
import time
from aws import aws_session
from dotenv import load_dotenv
def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password):
client = paramiko.SSHClient()
client.load_system_host_keys()
try:
transport = paramiko.Transport(sftp_host, sftp_port)
except Exception as e:
return 'conn_error'
try:
transport.connect(username=sftp_username, password=sftp_password)
except Exception as identifier:
return 'auth_error'
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def transfer_chunk_from_sftp_to_s3(sftp_file, s3_connection, multipart_upload, bucket_name,
s3_file_path, part_number, chunk_size):
start_time = time.time()
chunk = sftp_file.read(int(chunk_size))
part = s3_connection.upload_part(
Bucket=bucket_name,
Key=s3_file_path,
PartNumber=part_number,
UploadId=multipart_upload["UploadId"],
Body=chunk)
end_time = time.time()
total_seconds = end_time - start_time
print(
"speed is {} kb/s total seconds taken {}".format(
math.ceil((int(chunk_size) / 1024) / total_seconds), total_seconds
)
)
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path,
aws_s3_path, sftp_username, sftp_password, chunk_size=26214400):
sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password)
sftp_file = sftp_connection.file(sftp_path, "r", bufsize=-1)
s3_connection = aws_session().client('s3')
sftp_file_size = sftp_file._get_size()
print('file size: ', sftp_file_size)
chunk_count = int(math.ceil(sftp_file_size / float(chunk_size)))
print('amount of chunks: ', chunk_count)
multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
parts = []
for i in range(chunk_count):
print("Transferring chunk {}...".format(i 1))
part = transfer_chunk_from_sftp_to_s3(
sftp_file,
s3_connection,
multipart_upload,
bucket_name,
aws_s3_path,
i 1,
chunk_size,
)
parts.append(part)
print("Chunk {} Transferred Successfully!".format(i 1))
part_info = {"Parts": parts}
s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=aws_s3_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
sftp_file.close()
if __name__ == '__main__':
load_dotenv()
bucket_name=''
sftp_host=''
sftp_port=int('22')
sftp_path=''
aws_s3_path=''
sftp_username=''
sftp_password=''
transfer_file_from_sftp_to_s3(
bucket_name=bucket_name,
sftp_host=sftp_host,
sftp_port=22,
sftp_path=sftp_path,
aws_s3_path=aws_s3_path,
sftp_username=sftp_username,
sftp_password=sftp_password
)
PS:我使用這個腳本的場景如下:我需要接受傳入的大檔案(從任何地方)通過管道處理它們,但處理管道是隔離的,因此無法從外部世界訪問(所有涉及資料的通信都是通過 S3 存盤完成)。我提出的解決方案的方案是中間的一臺小型機器,它可以接受 sftp 憑據,并將“流”(或逐塊上傳)檔案“流”到 S3 存盤,管道將從那里捕獲它并完成作業。
uj5u.com熱心網友回復:
這里有三個問題在起作用:
您將下載和上傳塊的時間計算為一次傳輸,因此如果下載需要 100 秒,上傳 1 mb 需要 10 秒,您報告的速度為 10kb/s(1mb / 110 秒)而不是 11kb/s其次是 103kb/s。我提到這一點是因為它隱藏了第二個問題。
Paramiko 對某些人的持續轉移存在一些問題。 paramiko上的這個錯誤有更多細節。
而且,除了這些問題之外,您還要傳輸一大塊,然后轉身上傳。在大多數環境中,這是一種浪費,即使運行良好,這通常也意味著您的下載花費了 1/2 的時間等待上傳。
您可以解決所有問題:
import paramiko
import boto3
import multiprocessing
import time
# Simple helper to track an activity and report the duration
# and how fast data was transferred
class SpeedTracker:
def __init__(self):
self._start = time.time()
self._end = None
def start(self):
self._start = time.time()
def end(self, bytes, desc):
self._end = time.time()
secs = self._end - self._start
if secs > 0:
print(f"{desc} done, {(bytes / 1048576) / secs:0.3f} MB/s")
class FastTransport(paramiko.Transport):
# Correct issues with window size, see paramiko issue 175
def __init__(self, sock):
super(FastTransport, self).__init__(sock)
self.window_size = 2147483647
self.packetizer.REKEY_BYTES = pow(2, 40)
self.packetizer.REKEY_PACKETS = pow(2, 40)
def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key):
client = paramiko.SSHClient()
client.load_system_host_keys()
transport = FastTransport((sftp_host, sftp_port))
# Not necessary, but here for testing purposes, support either
# password or private key auth
if sftp_password is not None:
transport.connect(username=sftp_username, password=sftp_password)
else:
pkey = paramiko.RSAKey.from_private_key_file(sftp_key)
transport.connect(username=sftp_username, pkey=pkey)
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def pull_from_sftp(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue):
sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password, sftp_key)
sftp_file = sftp_connection.file(sftp_path, "rb")
# Enable pipelined mode, see paramiko issue 175
sftp_file.set_pipelined()
# Allow the transfer to fill up data in a background thread
sftp_file.prefetch()
chunk_size = 8388608
tracker = SpeedTracker()
num = 0
while True:
# Download one chunk
tracker.start()
chunk = sftp_file.read(chunk_size)
if len(chunk) == 0:
# All done, time to stop work
queue.put(None)
sftp_file.close()
break
# Send the chunk off to the reader process
num = 1
tracker.end(len(chunk), f"Downloaded chunk #{num}")
queue.put(chunk)
def send_chunk_to_s3(s3_connection, multipart_upload, bucket_name,
s3_file_path, part_number, chunk):
# Upload one chunk to S3
tracker = SpeedTracker()
part = s3_connection.upload_part(
Bucket=bucket_name,
Key=s3_file_path,
PartNumber=part_number,
UploadId=multipart_upload["UploadId"],
Body=chunk)
tracker.end(len(chunk), f"Uploaded chunk #{part_number}")
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path,
aws_s3_path, sftp_username, sftp_password, sftp_key):
# Start a worker process to get the data from SFTP
queue = multiprocessing.Queue(10)
proc = multiprocessing.Process(target=pull_from_sftp, args=(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue))
proc.start()
# And start reading from that worker to upload its results
s3_connection = boto3.client('s3')
multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
parts = []
while True:
chunk = queue.get()
if chunk is None:
break
part = send_chunk_to_s3(
s3_connection,
multipart_upload,
bucket_name,
aws_s3_path,
len(parts) 1,
chunk,
)
parts.append(part)
# All done, clean up and finalize the multipart upload
proc.join()
part_info = {"Parts": parts}
resp = s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=aws_s3_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
print(resp)
if __name__ == '__main__':
sftp_host='TODO'
sftp_port=int('22')
sftp_path='TODO'
sftp_username='TODO'
sftp_password='TODO'
sftp_key=None
bucket_name='TODO'
aws_s3_path='TODO'
transfer_file_from_sftp_to_s3(
bucket_name=bucket_name,
sftp_host=sftp_host,
sftp_port=22,
sftp_path=sftp_path,
aws_s3_path=aws_s3_path,
sftp_username=sftp_username,
sftp_password=sftp_password,
sftp_key=sftp_key
)
在我的本地機器上測驗,我看到下載和上傳速度大約為 40 MB/s,如果沒有錯誤修復,這當然更好。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/386943.html
