Source code for airflow_plugins.operators.zip

import glob
import logging
import os
from zipfile import ZipFile

from airflow.models import BaseOperator
from airflow.utils import apply_defaults

__author__ = 'rssanders3'

"""
Documentation References:
 - https://docs.python.org/2/library/zipfile.html
 - https://pymotw.com/2/zipfile/
"""


[docs]class ZipOperator(BaseOperator): """ An operator which takes in a path to a file and zips the contents to a location you define. :param path_to_file_to_zip: Full path to the file you want to Zip :type path_to_file_to_zip: string :param path_to_save_zip: Full path to where you want to save the Zip file :type path_to_save_zip: string """ template_fields = ('path_to_file_to_zip', 'path_to_save_zip') template_ext = [] # ZipOperator's Main Color: white # todo: find better color ui_color = '#ffffff' @apply_defaults def __init__( self, path_to_file_to_zip, path_to_save_zip, *args, **kwargs): super(ZipOperator, self).__init__(*args, **kwargs) self.path_to_file_to_zip = path_to_file_to_zip self.path_to_save_zip = path_to_save_zip def execute(self, context): logging.info("Executing ZipOperator.execute(context)") logging.info("Path to the File to Zip provided " "by the User (path_to_file_to_zip): " + str(self.path_to_file_to_zip)) logging.info( "Path to save the Zip File provided by the " "User (path_to_save_zip) : " + str(self.path_to_save_zip)) dir_path_to_file_to_zip = os.path.dirname( os.path.abspath(self.path_to_file_to_zip)) logging.info("Absolute path to the File to Zip: " + str(dir_path_to_file_to_zip)) zip_file_name = os.path.basename(self.path_to_save_zip) logging.info("Zip File Name: " + str(zip_file_name)) file_to_zip_name = os.path.basename(self.path_to_file_to_zip) logging.info("Name of the File or Folder to be Zipped: " + str(file_to_zip_name)) os.chdir(dir_path_to_file_to_zip) logging.info("Current Working Directory: " + str(os.getcwd())) with ZipFile(zip_file_name, 'w') as zip_file: logging.info("Created zip file object '" + str(zip_file) + "' with name '" + str(zip_file_name) + "'") is_file = os.path.isfile(self.path_to_file_to_zip) logging.info( "Is the File to Zip a File " "(else its a folder): " + str(is_file)) if is_file: logging.info( "Writing '" + str(file_to_zip_name) + "to zip file") zip_file.write(file_to_zip_name) else: # is folder for dirname, subdirs, files in os.walk(file_to_zip_name): logging.info("Writing '" + str(dirname) + "to zip file") zip_file.write(dirname) for filename in files: file_name_to_write = os.path.join(dirname, filename) logging.info( "Writing '" + str(file_name_to_write) + "to zip file") zip_file.write(file_name_to_write) # todo: print out contents and results of zip file creation # (compression ratio, size, etc) logging.info("Closing Zip File Object") zip_file.close() logging.info("Moving '" + str(zip_file_name) + "' to '" + str(self.path_to_save_zip) + "'") os.rename(zip_file_name, self.path_to_save_zip) logging.info("Finished executing ZipOperator.execute(context)")
[docs]class UnzipOperator(BaseOperator): """ An operator which takes in a path to a zip file and unzips the contents to a location you define. :param path_to_zip_file: Full path to the zip file you want to Unzip :type path_to_zip_file: string :param path_to_unzip_contents: Full path to where you want to save the contents of the Zip file you're Unzipping :type path_to_unzip_contents: string """ template_fields = ('path_to_zip_file', 'path_to_unzip_contents') template_ext = [] # UnzipOperator's Main Color: white # todo: find better color ui_color = '#ffffff' @apply_defaults def __init__( self, path_to_zip_file=None, path_to_zip_folder=None, path_to_zip_folder_pattern='*.zip', path_to_unzip_contents=None, *args, **kwargs): super(UnzipOperator, self).__init__(*args, **kwargs) self.path_to_zip_folder = path_to_zip_folder self.path_to_zip_folder_pattern = path_to_zip_folder_pattern self.path_to_zip_file = path_to_zip_file self.path_to_unzip_contents = path_to_unzip_contents def execute(self, context): logging.info("Executing UnzipOperator.execute(context)") logging.info("path_to_zip_file: " + str(self.path_to_zip_file)) logging.info("path_to_unzip_contents: " + str(self.path_to_unzip_contents)) # populate path_to_zip_file if zip directory is specified if self.path_to_zip_folder: directory_files = glob.glob(os.path.join( self.path_to_zip_folder, self.path_to_zip_folder_pattern)) self.path_to_zip_file = max(directory_files, key=os.path.getctime) # No check is done if the zip file is valid so that the operator fails # when expected so that airflow can properly mark the task as failed # and schedule retries as needed with ZipFile(self.path_to_zip_file, 'r') as zip_file: logging.info("Created zip file object '" + str(zip_file) + "' from path '" + str(self.path_to_zip_file) + "'") logging.info("Extracting all the contents to '" + str(self.path_to_unzip_contents) + "'") zip_file.extractall(self.path_to_unzip_contents) logging.info("Closing Zip File Object") zip_file.close() logging.info("Finished executing UnzipOperator.execute(context)")