import sys import boto3 import os DEFAULT = '' S3_ENDPOINT = os.getenv('S3_ENDPOINT', DEFAULT) S3_ACCESS_KEY = os.getenv('S3_ACCESS_KEY', DEFAULT) S3_SECRET_KEY = os.getenv('S3_SECRET_KEY', DEFAULT) DEFAULT_FILES_TO_KEEP = 5 def validate_env_vars(): if S3_ENDPOINT == '': print('S3_ENDPOINT is not set') sys.exit(1) if S3_ACCESS_KEY == '': print('S3_ACCESS_KEY is not set') sys.exit(1) if S3_SECRET_KEY == '': print('S3_SECRET_KEY is not set') sys.exit(1) def clean_files(): print('Cleaning files...') buckets = get_buckets() if len(buckets) == 0: print('No buckets found') return # Loop through each bucket and check how many files are in it for bucket in buckets: try: bucket_name = bucket['Name'] print(f'Bucket: {bucket_name}') # Get all files in the bucket s3 = boto3.client('s3', endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY) response = s3.list_objects_v2(Bucket=bucket_name) files = response['Contents'] # Check if the number of files is greater than the number of files to keep if len(files) > DEFAULT_FILES_TO_KEEP: # Sort the files by date files.sort(key=lambda x: x['LastModified']) # Delete the oldest files for i in range(len(files) - DEFAULT_FILES_TO_KEEP): try: file = files[i] file_name = file['Key'] print(f'Deleting file: {file_name}') s3.delete_object(Bucket=bucket_name, Key=file_name) print(f'File deleted: {file_name}') except Exception as e: print(f'Error deleting file: {file_name}') print(e) else: print('No files to delete') except Exception as e: print('Error processing bucket') print(e) print('Files cleaned') def get_buckets(): try: # Get all buckets for this endpoint s3 = boto3.client('s3', endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY) response = s3.list_buckets() return response['Buckets'] except Exception as e: print('Error getting buckets') print(e) sys.exit(1) if __name__ == '__main__': # Validate the environment variables validate_env_vars() # Clean the files clean_files()