Lambda Python S3で5GB以上のファイルをコピーする
Lambda Python S3で5GB以上のファイルをコピーする:
Lambda+PythonでS3のファイルをコピーしようとして苦労した話。(PythonでローカルからS3にアップロードするコードは書いたことがあります)
サイズ30バイトでチャンク10バイトの場合
0-9
10-19
20-29
となります。0-10, 10-20 とすると、出来たファイルが少しおかしくなりました。最後の 20-29 を 20-30 にするとエラーになりました。(要は苦労したという話)
上記の方法では20GBのコピーに7分かかりました。Lambdaの制限時間は15分です。下記の高速版では、20GBのコピーで7分から1分に短縮できました。
サイズ60GBの動画ファイルをコピーしたいのですが、5GB以上のマルチパート自体がマイナーなのか、あまり情報がないですね。今回の目的は移動(コピー&削除)をしたいのです。オンプレのサーバーならファイルをコピーするだけでCPUが100%になるでしょう。S3は無限に増えるので、ストレージの増設も不要です。LambdaとS3でサーバーレスというのも好感触。
Lambda+PythonでS3のファイルをコピーしようとして苦労した話。(PythonでローカルからS3にアップロードするコードは書いたことがあります)
- 5GB以上不可(同じS3内なのに)
- Lambdaは15分制限(2018/10)
- チャンク処理が面倒
- Lambdaでスレッド処理
通常のコピー(5GB以上不可)
s3 = boto3.resource('s3') s3.Object(new_bucket, new_file).copy_from(CopySource={'Bucket':old_bucket, 'Key':old_file})
マルチパートコピー(遅い版)
サイズ30バイトでチャンク10バイトの場合0-9
10-19
20-29
となります。0-10, 10-20 とすると、出来たファイルが少しおかしくなりました。最後の 20-29 を 20-30 にするとエラーになりました。(要は苦労したという話)
# Copy 5 GB or more in S3 def multipart_copy(new_bucket, new_file, old_bucket, old_file): s3 = boto3.resource('s3') mp = s3.Object(new_bucket, new_file).initiate_multipart_upload(new_file) parts = [] part_byte = 1 * 1024 * 1024 * 1024 for part_num in range(1,100): # 0-9 # 10-19 # 20-29 first_byte = (part_num-1) * part_byte last_byte = (part_num * part_byte) - 1 bLast = False if (size_byte-1) < last_byte: last_byte = (size_byte-1) bLast = True part_copy = mp.Part(part_num) response = part_copy.copy_from( CopySource={'Bucket':old_bucket, 'Key':old_file}, CopySourceRange='bytes='+ str(first_byte) +'-'+ str(last_byte) ) part_etag = response['CopyPartResult']['ETag'] parts.append({ 'PartNumber': part_num, 'ETag': part_etag }) if bLast == True: updated_object = mp.complete(MultipartUpload={'Parts': parts}) break
マルチパートコピー(スレッド高速版)
上記の方法では20GBのコピーに7分かかりました。Lambdaの制限時間は15分です。下記の高速版では、20GBのコピーで7分から1分に短縮できました。- 3GB以下なら通常コピー
- チャンクサイズ1GB
- スレッド数は8個まで(簡易)
#----------------------------------------------------------- # S3 Multipart Copy # 3 GB or more with multipart # Chunk size 1 GB # Up to 8 threads # 1 min with 20 GB copy (7 min without thread) #----------------------------------------------------------- import boto3 import threading from time import sleep class S3MultipartCopy: # copy def copy(self, new_bucket, new_file, old_bucket, old_file, size_byte): ret = False if size_byte < 3 * 1024 * 1024 * 1024: s3 = boto3.resource('s3') s3.Object(new_bucket, new_file).copy_from(CopySource={'Bucket':old_bucket, 'Key':old_file}) ret = True else: ret = multipart_copy_async(self, new_bucket, new_file, old_bucket, old_file, size_byte) return ret # multipart_copy_async def multipart_copy_async(self, new_bucket, new_file, old_bucket, old_file, size_byte): ret = False try: s3 = boto3.resource('s3') self.mp = s3.Object(new_bucket, new_file).initiate_multipart_upload(new_file) self.parts = [] thread_list = [] for part_num in range(1,100): # 0-9 # 10-19 # 20-29 (Attention to -1) part_byte = 1 *1024 * 1024 * 1024 first_byte = (part_num-1) * part_byte last_byte = (part_num * part_byte) - 1 bLast = False if (size_byte-1) < last_byte: last_byte = (size_byte-1) bLast = True thread_1 = threading.Thread(target=self._upload_part, args=[new_bucket, new_file, old_bucket, old_file, part_num, first_byte, last_byte]) thread_1.start() thread_list.append(thread_1) if bLast == True: break # thread 8 (timeout 10sec) for wait in range(10): sleep(1) count = sum( 1 for t in thread_list if t.is_alive() ) if count<8: break # Wait till the end for t in thread_list: t.join(150) # Sort required self.parts = sorted(self.parts, key=lambda x:x['PartNumber']) if len(self.parts)==len(thread_list): # complete updated_object = self.mp.complete(MultipartUpload={'Parts': self.parts}) ret = True else: logger.error('abort_multipart_upload parts=' + str(len(self.parts)) + ' threads=' + str(len(thread_list)) ) boto3.client('s3').abort_multipart_upload( Bucket=new_bucket, Key=new_file, UploadId=self.mp.id) except Exception as e: logger.debug('Upload failed Exception(%s)', e.args) raise return ret # multipart_copy single (Called from thread) def _upload_part(self, new_bucket, new_file, old_bucket, old_file, part_num, first_byte, last_byte, amount_of_retries=2): def _upload_part_retry(retries_left=amount_of_retries): try: part_copy = self.mp.Part(part_num) response = part_copy.copy_from( CopySource={'Bucket':old_bucket, 'Key':old_file}, CopySourceRange='bytes='+ str(first_byte) +'-'+ str(last_byte) ) part_etag = response['CopyPartResult']['ETag'] self.parts.append({ 'PartNumber': part_num, 'ETag': part_etag }) except Exception as exc: if retries_left: sleep(1) logger.debug('Upload retry') _upload_part_retry(retries_left=retries_left-1) else: raise exc _upload_part_retry()
コメント
コメントを投稿