import os import sys #Get the script directory and project root script_dir = os.path.dirname(os.path.abspath(__file__)) #print(script_dir) project_root = os.path.dirname(script_dir) #print("Project root: ", project_root) s3cmd_config_file_path = '/home/$USER/.s3cfg' #s3cmd_config_file_path = os.path.expanduser('~/.s3cfg') print("s3cmd config file path: ", s3cmd_config_file_path) #sys.exit() png_file = 'private_test.png' S3_bucket_name = 'eo-adrop' #'eo-public' # png_file_path = os.path.join(script_dir, png_file) test_folder= 'private_test_folder/' #CHECK if the s3cmd config file exists if not os.path.isfile(s3cmd_config_file_path): print(f"Error: s3cmd config file not found at {s3cmd_config_file_path}") sys.exit(1) print("--------------------------------------------------") print (" ") # *************** CHECK the connection to the S3 bucket using s3cmd command line tool **************** ### LIST thte content of the S3 bucket to check the connection to the bucket and the validity of the s3cmd config file s3cmd_check_command = f"s3cmd -c {s3cmd_config_file_path} ls s3://{S3_bucket_name}/" print("Checking connection to S3 bucket with command: ", s3cmd_check_command) s3cmd_check_result = os.system(s3cmd_check_command) if s3cmd_check_result != 0: print("Error: Unable to connect to S3 bucket. Please check your s3cmd configuration and network connection.") sys.exit(1) print("Connection to S3 bucket successful.") print("--------------------------------------------------") print (" ") ####### PUT a file in the S3 bucket ################# test_file_path = os.path.join(script_dir, 'private_test_upload.txt') with open(test_file_path, 'w') as test_file: test_file.write('This is a test file for checking S3 upload command.') s3cmd_put_command = f"s3cmd -c {s3cmd_config_file_path} put {test_file_path} s3://{S3_bucket_name}/private_test_upload.txt" ##VERIFY if the file has been uploaded successfully by listing the content of the bucket print("THIS IS THE PRIVATE BUCKET: ", s3cmd_put_command) s3cmd_put_result = os.system(s3cmd_put_command) if s3cmd_put_result != 0: print("Error: Unable to upload test file to S3 bucket.") sys.exit(1) print("Test file uploaded successfully to S3 bucket.") s3cmd_list_command = f"s3cmd -c {s3cmd_config_file_path} ls s3://{S3_bucket_name}/" print("Listing content of S3 bucket with command: ", s3cmd_list_command) s3cmd_list_result = os.system(s3cmd_list_command) if s3cmd_list_result != 0: print("Error: Unable to list content of S3 bucket.") sys.exit(1) print("--------------------------------------------------") print (" ") ############################################################################################## ################## SIMPLY Create a folder ################################# # **************** # ------------ S3 doesn't have the concept of directories, i.e. the whole test_folder/png_file is the file name.--------------- # **************** #s3cmd_mkdir_command = f"s3cmd -c {s3cmd_config_file_path} put {png_file_path} s3://{S3_bucket_name}/{test_folder}" # print("Creating bucket folder for test upload with command: ", s3cmd_mkdir_command) # s3cmd_mkdir_result = os.system(s3cmd_mkdir_command) # if s3cmd_mkdir_result != 0: # print("Error: Unable to create bucket folder for test upload.") # sys.exit(1) # print("Bucket folder for test upload created successfully.") #print("--------------------------------------------------") #print (" ") ################################################### ######## CREATE A folder and check if already exists for the test upload ################ # **************** # S3 doesn't have the concept of directories, the whole test_folder/png_file is the file name. # **************** ##Check if the test folder exists in the bucket check_existence_command = f"s3cmd -c {s3cmd_config_file_path} ls s3://{S3_bucket_name}/{test_folder}" print(" ") print(" ") print(" ") print("Checking if test folder exists in S3 bucket with command: ", check_existence_command) check_existence_result = os.system(check_existence_command) if check_existence_result != 0: print(f"Test folder {test_folder} does not exist in S3 bucket.") else: #if os.system(f"s3cmd -c {s3cmd_config_file_path} ls s3://eo-public/{test_folder}") != 0: print(f"Test folder {test_folder} already exists in S3 bucket.") s3cmd_mkdir_command = f"s3cmd -c {s3cmd_config_file_path} put {png_file_path} s3://{S3_bucket_name}/{test_folder}" print("Creating bucket folder for test upload with command: ", s3cmd_mkdir_command) s3cmd_mkdir_result = os.system(s3cmd_mkdir_command) if s3cmd_mkdir_result != 0: print("Error: Unable to create bucket folder for test upload.") sys.exit(1) print("Bucket folder for test upload created successfully.") print("--------------------------------------------------") print (" ") ############################# #### GET an object from the bucket to check the download command ############################## get_object_command = f"s3cmd -c {s3cmd_config_file_path} --skip-existing get s3://{S3_bucket_name}/{test_folder}{png_file} {script_dir}/" print("Getting an object from the S3 bucket with command: ", get_object_command) get_object_result = os.system(get_object_command) ###CHECK if the file already exists in the script directory and if so, delete it before trying to download it again downloaded_file_path = os.path.join(script_dir, png_file) if os.path.isfile(downloaded_file_path): print(f"File {downloaded_file_path} already exists") #sys.exit(1) print("--------------------------------------------------") print (" ") ###IF you want to delet the file after the test, you can uncomment the following line: #os.remove(downloaded_file_path) if get_object_result != 0: print("Error: Unable to get object from S3 bucket.") sys.exit(1) print("Object downloaded successfully from S3 bucket. get_object_result: ", get_object_result) print("--------------------------------------------------") print (" ") ####DELETE the test file from the bucket after the test upload ############################## delete_file_command = f"s3cmd -c {s3cmd_config_file_path} del s3://{S3_bucket_name}/{test_folder}{png_file}" print("Deleting the test file from the S3 bucket with command: ", delete_file_command) delete_file_result = os.system(delete_file_command) if delete_file_result != 0: print("Error: Unable to delete test file from S3 bucket.") sys.exit(1) print("Test file deleted successfully from S3 bucket. delete_file_result: ", delete_file_result) print("--------------------------------------------------") print (" ")