Lambda Python S3で5GB以上のファイルをコピーする

Lambda Python S3で5GB以上のファイルをコピーする:

Lambda+PythonでS3のファイルをコピーしようとして苦労した話。(PythonでローカルからS3にアップロードするコードは書いたことがあります)

  • 5GB以上不可(同じS3内なのに)
  • Lambdaは15分制限(2018/10)
  • チャンク処理が面倒
  • Lambdaでスレッド処理


通常のコピー(5GB以上不可)

s3 = boto3.resource('s3') 
s3.Object(new_bucket, new_file).copy_from(CopySource={'Bucket':old_bucket, 'Key':old_file}) 


マルチパートコピー(遅い版)

サイズ30バイトでチャンク10バイトの場合

0-9

10-19

20-29

となります。0-10, 10-20 とすると、出来たファイルが少しおかしくなりました。最後の 20-29 を 20-30 にするとエラーになりました。(要は苦労したという話)

# Copy 5 GB or more in S3 
def multipart_copy(new_bucket, new_file, old_bucket, old_file): 
    s3 = boto3.resource('s3') 
    mp = s3.Object(new_bucket, new_file).initiate_multipart_upload(new_file) 
    parts = [] 
    part_byte = 1 * 1024 * 1024 * 1024  
    for part_num in range(1,100): 
        # 0-9   
        # 10-19  
        # 20-29  
        first_byte = (part_num-1) * part_byte 
        last_byte  = (part_num    * part_byte) - 1 
 
        bLast = False 
        if (size_byte-1) < last_byte: 
            last_byte = (size_byte-1) 
            bLast = True 
 
        part_copy = mp.Part(part_num) 
        response = part_copy.copy_from( CopySource={'Bucket':old_bucket, 'Key':old_file},  
                                        CopySourceRange='bytes='+ str(first_byte) +'-'+ str(last_byte) ) 
 
        part_etag = response['CopyPartResult']['ETag'] 
        parts.append({ 
            'PartNumber': part_num, 
            'ETag': part_etag 
        }) 
 
        if bLast == True: 
            updated_object = mp.complete(MultipartUpload={'Parts': parts}) 
            break    


マルチパートコピー(スレッド高速版)

上記の方法では20GBのコピーに7分かかりました。Lambdaの制限時間は15分です。下記の高速版では、20GBのコピーで7分から1分に短縮できました。

  • 3GB以下なら通常コピー
  • チャンクサイズ1GB
  • スレッド数は8個まで(簡易)
#----------------------------------------------------------- 
# S3 Multipart Copy 
# 3 GB or more with multipart 
# Chunk size 1 GB 
# Up to 8 threads 
# 1 min with 20 GB copy (7 min without thread) 
#-----------------------------------------------------------     
import boto3 
import threading 
from time import sleep 
 
class S3MultipartCopy: 
    # copy 
    def copy(self, new_bucket, new_file, old_bucket, old_file, size_byte): 
        ret = False 
        if size_byte < 3 * 1024 * 1024 * 1024:  
            s3 = boto3.resource('s3') 
            s3.Object(new_bucket, new_file).copy_from(CopySource={'Bucket':old_bucket, 'Key':old_file}) 
            ret = True 
        else: 
            ret = multipart_copy_async(self, new_bucket, new_file, old_bucket, old_file, size_byte) 
        return ret 
 
    # multipart_copy_async 
    def multipart_copy_async(self, new_bucket, new_file, old_bucket, old_file, size_byte): 
        ret = False 
        try: 
            s3 = boto3.resource('s3') 
            self.mp = s3.Object(new_bucket, new_file).initiate_multipart_upload(new_file) 
            self.parts = [] 
            thread_list = [] 
            for part_num in range(1,100): 
                # 0-9   
                # 10-19  
                # 20-29 (Attention to -1)  
                part_byte = 1 *1024 * 1024 * 1024  
                first_byte = (part_num-1) * part_byte 
                last_byte  = (part_num    * part_byte) - 1 
 
                bLast = False 
                if (size_byte-1) < last_byte: 
                    last_byte = (size_byte-1) 
                    bLast = True 
 
                thread_1 = threading.Thread(target=self._upload_part, args=[new_bucket, new_file, old_bucket, old_file, part_num, first_byte, last_byte]) 
                thread_1.start() 
                thread_list.append(thread_1) 
 
                if bLast == True: 
                    break   
 
                # thread 8 (timeout 10sec) 
                for wait in range(10): 
                    sleep(1) 
                    count = sum( 1 for t in thread_list if t.is_alive() ) 
                    if count<8: 
                        break 
 
            # Wait till the end 
            for t in thread_list: 
                t.join(150) 
 
            # Sort required    
            self.parts = sorted(self.parts, key=lambda x:x['PartNumber']) 
 
            if len(self.parts)==len(thread_list): 
                # complete 
                updated_object = self.mp.complete(MultipartUpload={'Parts': self.parts}) 
                ret = True 
            else: 
                logger.error('abort_multipart_upload parts=' + str(len(self.parts)) + ' threads=' + str(len(thread_list)) ) 
                boto3.client('s3').abort_multipart_upload( 
                        Bucket=new_bucket, 
                        Key=new_file, 
                        UploadId=self.mp.id) 
 
        except Exception as e: 
            logger.debug('Upload failed Exception(%s)', e.args) 
            raise 
        return ret 
 
    # multipart_copy single (Called from thread) 
    def _upload_part(self, new_bucket, new_file, old_bucket, old_file, part_num, first_byte, last_byte, amount_of_retries=2): 
 
        def _upload_part_retry(retries_left=amount_of_retries): 
            try: 
                part_copy = self.mp.Part(part_num)                                    
                response = part_copy.copy_from( CopySource={'Bucket':old_bucket, 'Key':old_file},  
                                                CopySourceRange='bytes='+ str(first_byte) +'-'+ str(last_byte) ) 
                part_etag = response['CopyPartResult']['ETag'] 
                self.parts.append({ 
                    'PartNumber': part_num, 
                    'ETag': part_etag 
                }) 
            except Exception as exc: 
                if retries_left: 
                    sleep(1) 
                    logger.debug('Upload retry') 
                    _upload_part_retry(retries_left=retries_left-1) 
                else: 
                    raise exc 
 
        _upload_part_retry() 


まとめ

サイズ60GBの動画ファイルをコピーしたいのですが、5GB以上のマルチパート自体がマイナーなのか、あまり情報がないですね。今回の目的は移動(コピー&削除)をしたいのです。オンプレのサーバーならファイルをコピーするだけでCPUが100%になるでしょう。S3は無限に増えるので、ストレージの増設も不要です。LambdaとS3でサーバーレスというのも好感触。

コメント

このブログの人気の投稿

投稿時間:2021-06-17 05:05:34 RSSフィード2021-06-17 05:00 分まとめ(1274件)

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2020-12-01 09:41:49 RSSフィード2020-12-01 09:00 分まとめ(69件)