Source code for airflow_plugins.operators.slack.hooks


import json
import logging

import requests
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow_plugins import utils
from slackclient import SlackClient


[docs]class SlackHook(BaseHook): """Slack hook""" def __init__(self, token=None, method='chat.postMessage', api_params=None, channel=None, username=None, text=None, attachments=None, *args, **kwargs): self.token = token or utils.get_variable("SLACK_API_TOKEN") self.method = method self.api_params = api_params self.channel = channel or "airflow_stg" self.username = username or "Airflow (STG)" self.text = text self.attachments = attachments super(SlackHook, self).__init__(None, *args, **kwargs) @property def client(self): if not hasattr(self, "_client"): self._client = SlackClient(self.token) return self._client
[docs] def run(self, **kwargs): """ SlackAPIOperator calls will not fail even if the call is not unsuccessful. It should not prevent a DAG from completing in success. """ if not self.api_params: self.construct_api_call_params(**kwargs) rc = self.client.api_call(self.method, **self.api_params) if not rc['ok']: logging.error("Slack API call failed ({})".format(rc['error'])) raise AirflowException( "Slack API call failed: ({})".format(rc['error'])) return rc
def construct_api_call_params(self, **kwargs): self.api_params = { 'channel': self.channel } if self.username: self.api_params['username'] = self.username self.api_params['icon_url'] = \ 'https://raw.githubusercontent.com/airbnb/airflow' \ '/master/airflow/www/static/pin_100.png' if self.text: self.api_params['text'] = self.text if self.attachments: self.api_params['attachments'] = json.dumps(self.attachments) self.api_params.update(**kwargs)
[docs] def get_file_content(self, url): """Returns file content """ r = requests.get(url, headers={ 'Authorization': 'Bearer %s' % self.token }) if r.status_code == 200: return r.text
[docs] def get_channel_id(self, name): """Returns channel id by name """ rc = self.client.api_call("channels.list") for d in rc['channels']: if d['name'] == name: return d['id']