본문 바로가기
Python

Python - Boto3 S3 대량 데이터 가져오기, paginator

by 올엠 2024. 3. 3.
반응형

Boto3 라이브러리는 Python에서 AWS 리소스를 사용하기 위해 가장 많이 사용되는 라이브러리로 여기에서 최근 알게된 팁을 하나 정리해 본다.

아마 가장 많이 사용하는 기본적인 코드 방식은 다음과 같을 것이다.

import boto3


client = boto3.client('s3')
my_bucket = s3.Bucket('bucket_name')

for file in my_bucket.objects.all():
    print(file.key)

위 코드를 통해 기본적으로 쿼리를 진행하면 한번에 최대 1000개의 데이터만 가져올 수 있다.

하지만 기업에서 사용하는 데이터를 보통 더 많은 데이터를 보관하고 있기 때문에 1000개의 제한을 해결할 수 있는 방식을 추가로 제공하는데, ListObjectV2를 이용하여 해결 할 수 있다. 

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Paginator.ListObjectsV2

위 기술 문서를 보면 보다 자세히 정리된 내용을 알 수 있지만, 요약해 보면 게시판 Paginator 같이 여러 페이지를 통해 다량의 데이터를 한번에 쿼리해서 가져올 수 있다는 것이다.

 

기본적인 사용방법은 아래 와 같다.

paginator.paginate(Bucket='bucket')

import boto3


client = boto3.client('s3')
paginator = client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket='bucket')


for page in page_iterator:
    print(page['Contents'])

 

만약 이를 통한 샘플 코드를 작성한다면 다음과 같이 작성할 수 있다.

아래 코드의 특징은 bucket 내 특정 경로만 받거나 전체 경로를 받을 수 있도록 옵션을 통해 조정한다는 것이다.

import boto3
from os import path, makedirs
import time
import argparse
import subprocess

def download_all_objects_in_folder(s3bucket, s3path, local_dir):
    client = boto3.client('s3')
    paginator = client.get_paginator('list_objects_v2')
    if s3path == '/':
        response_iterator = paginator.paginate(Bucket=s3bucket)
    else:
        response_iterator = paginator.paginate(Bucket=s3bucket, Prefix=s3path)
    for page in response_iterator:
        for content in page['Contents']:
            filename = content['Key']
            if filename.endswith("/"):
                continue
            destination = path.join(local_dir, filename)
            print(destination)
            if not path.exists(path.dirname(destination)):
                makedirs(path.dirname(destination))
            client.download_file(s3bucket, filename, destination)
            # gz인 경우 압축해제
            if destination.endswith('.gz'):
                output = subprocess.Popen(['tar', '-xzf', filename])
                output.wait()
            # zip인 경우 압축해제
            if destination.endswith('.zip'):
                output = subprocess.Popen(['unzip', filename, '-d', path.dirname(destination)])
                output.wait()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    # S3 버켓 이름을 지정함
    parser.add_argument('-s3b', '--s3bucket', help="get s3bucket name")
    # s3 버켓에서 특정 디렉토리 하위로 받고자 할때, 옵션을 사용하지 않으면 s3bucket 전체를 받는다.
    parser.add_argument('-s3p', '--s3path', help="get s3path in s3bucket, if not use option download all of bucket")
    # 로컬 디렉토리 지정... 절대 경로로 사용하는 것을 추천함
    parser.add_argument('-p', '--path', help="localpath")
    args = parser.parse_args()

    if args.s3bucket:
        if args.s3path:
            if args.path:
                download_all_objects_in_folder(args.s3bucket, args.s3path, args.path)
            else:
                download_all_objects_in_folder(args.s3bucket, args.s3path, './')
        else:
            if args.path:
                download_all_objects_in_folder(args.s3bucket, '/', args.path)
            else:
                download_all_objects_in_folder(args.s3bucket, '/', './')
 
반응형