Lambda Python S3で5GB以上のファイルをコピーする
Lambda Python S3で5GB以上のファイルをコピーする:
Lambda+PythonでS3のファイルをコピーしようとして苦労した話。(PythonでローカルからS3にアップロードするコードは書いたことがあります)
サイズ30バイトでチャンク10バイトの場合
0-9
10-19
20-29
となります。0-10, 10-20 とすると、出来たファイルが少しおかしくなりました。最後の 20-29 を 20-30 にするとエラーになりました。(要は苦労したという話)
上記の方法では20GBのコピーに7分かかりました。Lambdaの制限時間は15分です。下記の高速版では、20GBのコピーで7分から1分に短縮できました。
サイズ60GBの動画ファイルをコピーしたいのですが、5GB以上のマルチパート自体がマイナーなのか、あまり情報がないですね。今回の目的は移動(コピー&削除)をしたいのです。オンプレのサーバーならファイルをコピーするだけでCPUが100%になるでしょう。S3は無限に増えるので、ストレージの増設も不要です。LambdaとS3でサーバーレスというのも好感触。
Lambda+PythonでS3のファイルをコピーしようとして苦労した話。(PythonでローカルからS3にアップロードするコードは書いたことがあります)
- 5GB以上不可(同じS3内なのに)
- Lambdaは15分制限(2018/10)
- チャンク処理が面倒
- Lambdaでスレッド処理
通常のコピー(5GB以上不可)
s3 = boto3.resource('s3')
s3.Object(new_bucket, new_file).copy_from(CopySource={'Bucket':old_bucket, 'Key':old_file})
マルチパートコピー(遅い版)
サイズ30バイトでチャンク10バイトの場合0-9
10-19
20-29
となります。0-10, 10-20 とすると、出来たファイルが少しおかしくなりました。最後の 20-29 を 20-30 にするとエラーになりました。(要は苦労したという話)
# Copy 5 GB or more in S3
def multipart_copy(new_bucket, new_file, old_bucket, old_file):
s3 = boto3.resource('s3')
mp = s3.Object(new_bucket, new_file).initiate_multipart_upload(new_file)
parts = []
part_byte = 1 * 1024 * 1024 * 1024
for part_num in range(1,100):
# 0-9
# 10-19
# 20-29
first_byte = (part_num-1) * part_byte
last_byte = (part_num * part_byte) - 1
bLast = False
if (size_byte-1) < last_byte:
last_byte = (size_byte-1)
bLast = True
part_copy = mp.Part(part_num)
response = part_copy.copy_from( CopySource={'Bucket':old_bucket, 'Key':old_file},
CopySourceRange='bytes='+ str(first_byte) +'-'+ str(last_byte) )
part_etag = response['CopyPartResult']['ETag']
parts.append({
'PartNumber': part_num,
'ETag': part_etag
})
if bLast == True:
updated_object = mp.complete(MultipartUpload={'Parts': parts})
break
マルチパートコピー(スレッド高速版)
上記の方法では20GBのコピーに7分かかりました。Lambdaの制限時間は15分です。下記の高速版では、20GBのコピーで7分から1分に短縮できました。- 3GB以下なら通常コピー
- チャンクサイズ1GB
- スレッド数は8個まで(簡易)
#-----------------------------------------------------------
# S3 Multipart Copy
# 3 GB or more with multipart
# Chunk size 1 GB
# Up to 8 threads
# 1 min with 20 GB copy (7 min without thread)
#-----------------------------------------------------------
import boto3
import threading
from time import sleep
class S3MultipartCopy:
# copy
def copy(self, new_bucket, new_file, old_bucket, old_file, size_byte):
ret = False
if size_byte < 3 * 1024 * 1024 * 1024:
s3 = boto3.resource('s3')
s3.Object(new_bucket, new_file).copy_from(CopySource={'Bucket':old_bucket, 'Key':old_file})
ret = True
else:
ret = multipart_copy_async(self, new_bucket, new_file, old_bucket, old_file, size_byte)
return ret
# multipart_copy_async
def multipart_copy_async(self, new_bucket, new_file, old_bucket, old_file, size_byte):
ret = False
try:
s3 = boto3.resource('s3')
self.mp = s3.Object(new_bucket, new_file).initiate_multipart_upload(new_file)
self.parts = []
thread_list = []
for part_num in range(1,100):
# 0-9
# 10-19
# 20-29 (Attention to -1)
part_byte = 1 *1024 * 1024 * 1024
first_byte = (part_num-1) * part_byte
last_byte = (part_num * part_byte) - 1
bLast = False
if (size_byte-1) < last_byte:
last_byte = (size_byte-1)
bLast = True
thread_1 = threading.Thread(target=self._upload_part, args=[new_bucket, new_file, old_bucket, old_file, part_num, first_byte, last_byte])
thread_1.start()
thread_list.append(thread_1)
if bLast == True:
break
# thread 8 (timeout 10sec)
for wait in range(10):
sleep(1)
count = sum( 1 for t in thread_list if t.is_alive() )
if count<8:
break
# Wait till the end
for t in thread_list:
t.join(150)
# Sort required
self.parts = sorted(self.parts, key=lambda x:x['PartNumber'])
if len(self.parts)==len(thread_list):
# complete
updated_object = self.mp.complete(MultipartUpload={'Parts': self.parts})
ret = True
else:
logger.error('abort_multipart_upload parts=' + str(len(self.parts)) + ' threads=' + str(len(thread_list)) )
boto3.client('s3').abort_multipart_upload(
Bucket=new_bucket,
Key=new_file,
UploadId=self.mp.id)
except Exception as e:
logger.debug('Upload failed Exception(%s)', e.args)
raise
return ret
# multipart_copy single (Called from thread)
def _upload_part(self, new_bucket, new_file, old_bucket, old_file, part_num, first_byte, last_byte, amount_of_retries=2):
def _upload_part_retry(retries_left=amount_of_retries):
try:
part_copy = self.mp.Part(part_num)
response = part_copy.copy_from( CopySource={'Bucket':old_bucket, 'Key':old_file},
CopySourceRange='bytes='+ str(first_byte) +'-'+ str(last_byte) )
part_etag = response['CopyPartResult']['ETag']
self.parts.append({
'PartNumber': part_num,
'ETag': part_etag
})
except Exception as exc:
if retries_left:
sleep(1)
logger.debug('Upload retry')
_upload_part_retry(retries_left=retries_left-1)
else:
raise exc
_upload_part_retry()
コメント
コメントを投稿