Source code for airflow_plugins.operators.csv

import logging
import os

from airflow_plugins.operators import BashOperator


[docs]class CSVLook(BashOperator): """Get stats of the CSV file""" bash_command = """ csvlook {{ params.extra }} {{ params.path }} """
[docs]class CSVSQL(BashOperator): """Use csvsql tool for migration CSV to SQL. For more parameters check csvsql.""" bash_command = """ csvsql {{ params.extra }} {{ params.path }} """
[docs]class CSVtoDB(BashOperator): """Use csvsql tool for migration csv to SQL database. For more parameters check csvsql.""" bash_command = """ csvsql {{ params.extra }} \ {%- if params.db %} --db="{{ params.db }}/{{ params.company|lower }} \ {%- if params.company %}_{%- endif %}{{ params.database_name }}" \ --no-inference -y 200 --insert --tables {{ params.get("table_name", "import") }} \ {%- endif %} {{ params.local_path }} """ # noqa
class DBtoCSV(BashOperator): bash_command = """ sql2csv {{ params.extra }} --query "{{ params.query }}" \ {%- if params.db %} --db="{{ params.db }}/{{ params.company|lower }} \ {%- if params.company %}_{%- endif %}{{ params.database_name }}" \ {%- endif %} > {{ params.output_path_temp }} """ # noqa
[docs]class CSVStats(BashOperator): """Get stats of the CSV file Use csvstat. """ bash_command = """ csvstat {{ params.extra }} {{ params.path }} """
[docs]class SplitCSVtoDB(CSVtoDB): """Split CSV and upload to DB. """ @staticmethod def _split_file(filepath, n): if n <= 1: return files = [open('{}.{}'.format(filepath, i), mode='w') for i in range(n)] with open(filepath, mode='r') as f: line = f.readline() for file in files: # header line file.write(line) i = 0 line = f.readline() while line: files[i].write(line) line = f.readline() i = (i + 1) % n for file in files: file.close() @staticmethod def _determine_splits(filepath): size = os.stat(filepath).st_size # splits as hundreds of megabytes splits = size // (100 * 1000 * 1000) + 1 logging.info('File size: {} bytes ==> {} splits'.format( size, splits if splits > 1 else 'no')) return splits def pre_execute(self, context): filepath = context['params']['local_path'] self._splits = self._determine_splits(filepath) try: self._split_file(filepath, self._splits) except Exception as e: self._splits = 0 logging.warning('Splitting the input file failed: {}'.format(e)) logging.info('Trying to load the whole file.') if self._splits > 1: self.bash_command = 'for i in $(seq 0 {}); do {}.$i; done'.format( self._splits - 1, self.bash_command.strip()) def post_execute(self, context): filepath = context['params']['local_path'] if self._splits > 1: for i in range(self._splits): file = '{}.{}'.format(filepath, i) try: os.remove(file) except Exception as e: # it's ok, these are just helper files logging.warning('Unable to delete file' '{}: {}'.format(file, e))