Source code for slack.sansio

"""
Collection of functions for sending and decoding request to or from the slack API
"""

import cgi
import hmac
import json
import time
import base64
import hashlib
import logging
from typing import Tuple, Union, Optional, MutableMapping

from . import HOOK_URL, ROOT_URL, events, methods, exceptions

LOG = logging.getLogger(__name__)

RECONNECT_EVENTS = ("team_migration_started", "goodbye")
"""Events type preceding a disconnection"""

SKIP_EVENTS = ("reconnect_url",)
"""Events that do not need to be dispatched"""

ITERMODE = ("cursor", "page", "timeline")
"""Supported pagination mode"""


[docs]def raise_for_status( status: int, headers: MutableMapping, data: MutableMapping ) -> None: """ Check request response status Args: status: Response status headers: Response headers data: Response data Raises: :class:`slack.exceptions.RateLimited`: For 429 status code :class:`slack.exceptions:HTTPException`: """ if status != 200: if status == 429: if isinstance(data, str): error = data else: error = data.get("error", "ratelimited") try: retry_after = int(headers.get("Retry-After", 1)) except ValueError: retry_after = 1 raise exceptions.RateLimited(retry_after, error, status, headers, data) else: raise exceptions.HTTPException(status, headers, data)
[docs]def raise_for_api_error(headers: MutableMapping, data: MutableMapping) -> None: """ Check request response for Slack API error Args: headers: Response headers data: Response data Raises: :class:`slack.exceptions.SlackAPIError` """ if not data["ok"]: raise exceptions.SlackAPIError(data.get("error", "unknow_error"), headers, data) if "warning" in data: LOG.warning("Slack API WARNING: %s", data["warning"])
[docs]def decode_body(headers: MutableMapping, body: bytes) -> dict: """ Decode the response body For 'application/json' content-type load the body as a dictionary Args: headers: Response headers body: Response body Returns: decoded body """ type_, encoding = parse_content_type(headers) decoded_body = body.decode(encoding) # There is one api that just returns `ok` instead of json. In order to have a consistent API we decided to modify the returned payload into a dict. if type_ == "application/json": payload = json.loads(decoded_body) else: if decoded_body == "ok": payload = {"ok": True} else: payload = {"ok": False, "data": decoded_body} return payload
[docs]def parse_content_type(headers: MutableMapping) -> Tuple[Optional[str], str]: """ Find content-type and encoding of the response Args: headers: Response headers Returns: :py:class:`tuple` (content-type, encoding) """ content_type = headers.get("content-type") if not content_type: return None, "utf-8" else: type_, parameters = cgi.parse_header(content_type) encoding = parameters.get("charset", "utf-8") return type_, encoding
[docs]def prepare_request( url: Union[str, methods], data: Optional[MutableMapping], headers: Optional[MutableMapping], global_headers: MutableMapping, token: str, as_json: Optional[bool] = None, ) -> Tuple[str, Union[str, MutableMapping], MutableMapping]: """ Prepare outgoing request Create url, headers, add token to the body and if needed json encode it Args: url: :class:`slack.methods` item or string of url data: Outgoing data headers: Custom headers global_headers: Global headers token: Slack API token as_json: Post JSON to the slack API Returns: :py:class:`tuple` (url, body, headers) """ if isinstance(url, methods): as_json = as_json or url.value[3] real_url = url.value[0] else: real_url = url as_json = False if not headers: headers = {**global_headers} else: headers = {**global_headers, **headers} payload: Optional[Union[str, MutableMapping]] = None if real_url.startswith(HOOK_URL) or (real_url.startswith(ROOT_URL) and as_json): payload, headers = _prepare_json_request(data, token, headers) elif real_url.startswith(ROOT_URL) and not as_json: payload = _prepare_form_encoded_request(data, token) else: real_url = ROOT_URL + real_url payload = _prepare_form_encoded_request(data, token) return real_url, payload, headers
def _prepare_json_request( data: Optional[MutableMapping], token: str, headers: MutableMapping ) -> Tuple[str, MutableMapping]: headers["Authorization"] = f"Bearer {token}" headers["Content-type"] = "application/json; charset=utf-8" if isinstance(data, events.Message): payload = data.to_json() else: payload = json.dumps(data or {}) return payload, headers def _prepare_form_encoded_request( data: Optional[MutableMapping], token: str ) -> MutableMapping: if isinstance(data, events.Message): data = data.serialize() if not data: data = {"token": token} elif "token" not in data: data["token"] = token return data
[docs]def decode_response(status: int, headers: MutableMapping, body: bytes) -> dict: """ Decode incoming response Args: status: Response status headers: Response headers body: Response body Returns: Response data """ data = decode_body(headers, body) raise_for_status(status, headers, data) raise_for_api_error(headers, data) return data
[docs]def find_iteration( url: Union[methods, str], itermode: Optional[str] = None, iterkey: Optional[str] = None, ) -> Tuple[str, str]: """ Find iteration mode and iteration key for a given :class:`slack.methods` Args: url: :class:`slack.methods` or string url itermode: Custom iteration mode iterkey: Custom iteration key Returns: :py:class:`tuple` (itermode, iterkey) """ if isinstance(url, methods): if not itermode: itermode = url.value[1] if not iterkey: iterkey = url.value[2] if not iterkey or not itermode: raise ValueError("Iteration not supported for: {}".format(url)) elif itermode not in ITERMODE: raise ValueError("Iteration not supported for: {}".format(itermode)) return itermode, iterkey
[docs]def prepare_iter_request( url: Union[methods, str], data: MutableMapping, *, iterkey: Optional[str] = None, itermode: Optional[str] = None, limit: int = 200, itervalue: Optional[Union[str, int]] = None, ) -> Tuple[MutableMapping, str, str]: """ Prepare outgoing iteration request Args: url: :class:`slack.methods` item or string of url data: Outgoing data limit: Maximum number of results to return per call. iterkey: Key in response data to iterate over (required for url string). itermode: Iteration mode (required for url string) (one of `cursor`, `page` or `timeline`) itervalue: Value for current iteration (cursor hash, page or timestamp depending on the itermode) Returns: :py:class:`tuple` (data, iterkey, itermode) """ itermode, iterkey = find_iteration(url, itermode, iterkey) if itermode == "cursor": data["limit"] = limit if itervalue: data["cursor"] = itervalue elif itermode == "page": data["count"] = limit if itervalue: data["page"] = itervalue elif itermode == "timeline": data["count"] = limit if itervalue: data["latest"] = itervalue return data, iterkey, itermode
[docs]def decode_iter_request(data: dict) -> Optional[Union[str, int]]: """ Decode incoming response from an iteration request Args: data: Response data Returns: Next itervalue """ if "response_metadata" in data: return data["response_metadata"].get("next_cursor") elif "paging" in data: current_page = int(data["paging"].get("page", 1)) max_page = int(data["paging"].get("pages", 1)) if current_page < max_page: return current_page + 1 elif "has_more" in data and data["has_more"] and "latest" in data: return data["messages"][-1]["ts"] return None
[docs]def discard_event(event: events.Event, bot_id: str = None) -> bool: """ Check if the incoming event needs to be discarded Args: event: Incoming :class:`slack.events.Event` bot_id: Id of connected bot Returns: boolean """ if event["type"] in SKIP_EVENTS: return True elif bot_id and isinstance(event, events.Message): if event.get("bot_id") == bot_id: LOG.debug("Ignoring event: %s", event) return True elif "message" in event and event["message"].get("bot_id") == bot_id: LOG.debug("Ignoring event: %s", event) return True return False
[docs]def need_reconnect(event: events.Event) -> bool: """ Check if RTM needs reconnecting Args: event: Incoming :class:`slack.events.Event` Returns: boolean """ if event["type"] in RECONNECT_EVENTS: return True else: return False
[docs]def validate_request_signature( body: str, headers: MutableMapping, signing_secret: str ) -> None: """ Validate incoming request signature using the application signing secret. Contrary to the ``team_id`` and ``verification_token`` verification this method is not called by ``slack-sansio`` when creating object from incoming HTTP request. Because the body of the request needs to be provided as text and not decoded as json beforehand. Args: body: Raw request body headers: Request headers signing_secret: Application signing_secret Raise: :class:`slack.exceptions.InvalidSlackSignature`: when provided and calculated signature do not match :class:`slack.exceptions.InvalidTimestamp`: when incoming request timestamp is more than 5 minutes old """ request_timestamp = int(headers["X-Slack-Request-Timestamp"]) if (int(time.time()) - request_timestamp) > (60 * 5): raise exceptions.InvalidTimestamp(timestamp=request_timestamp) slack_signature = headers["X-Slack-Signature"] calculated_signature = ( "v0=" + hmac.new( signing_secret.encode("utf-8"), f"""v0:{headers["X-Slack-Request-Timestamp"]}:{body}""".encode("utf-8"), digestmod=hashlib.sha256, ).hexdigest() ) if not hmac.compare_digest(slack_signature, calculated_signature): raise exceptions.InvalidSlackSignature(slack_signature, calculated_signature)