Source code for slack.io.abc

import json
import time
import logging
from typing import Tuple, Union, Optional, AsyncIterator, MutableMapping

from .. import events, sansio, methods, exceptions

LOG = logging.getLogger(__name__)


[docs]class SlackAPI: """ :py:term:`abstract base class` abstracting the HTTP library used to call Slack API. Built with the functions of :mod:`slack.sansio`. Args: session: HTTP session token: Slack API token headers: Default headers for all request """ def __init__(self, *, token: str, headers: Optional[MutableMapping] = None) -> None: self._token = token self._headers = headers or {} async def _request( self, method: str, url: str, headers: Optional[MutableMapping], body: Optional[Union[str, MutableMapping]], ) -> Tuple[int, bytes, MutableMapping]: raise NotImplementedError() async def _rtm(self, url: str) -> AsyncIterator[str]: yield "" raise NotImplementedError() async def sleep(self, seconds: Union[int, float]): raise NotImplementedError() async def _make_query( self, url: str, body: Optional[Union[str, MutableMapping]], headers: Optional[MutableMapping], ) -> dict: LOG.debug("Querying %s with %s, %s", url, headers, body) status, rep_body, rep_headers = await self._request("POST", url, headers, body) LOG.debug("Response from %s: %s, %s, %s", url, status, rep_body, rep_headers) response_data = sansio.decode_response(status, rep_headers, rep_body) return response_data
[docs] async def query( self, url: Union[str, methods], data: Optional[MutableMapping] = None, headers: Optional[MutableMapping] = None, as_json: Optional[bool] = None, ) -> dict: """ Query the slack API When using :class:`slack.methods` the request is made `as_json` if available Args: url: :class:`slack.methods` or url string data: JSON encodable MutableMapping headers: Custom headers as_json: Post JSON to the slack API Returns: dictionary of slack API response data """ url, body, headers = sansio.prepare_request( url=url, data=data, headers=headers, as_json=as_json, global_headers=self._headers, token=self._token, ) return await self._make_query(url, body, headers)
[docs] async def iter( self, url: Union[str, methods], data: Optional[MutableMapping] = None, headers: Optional[MutableMapping] = None, *, limit: int = 200, iterkey: Optional[str] = None, itermode: Optional[str] = None, minimum_time: Optional[int] = None, as_json: Optional[bool] = None ) -> AsyncIterator[dict]: """ Iterate over a slack API method supporting pagination When using :class:`slack.methods` the request is made `as_json` if available Args: url: :class:`slack.methods` or url string data: JSON encodable MutableMapping headers: limit: Maximum number of results to return per call. iterkey: Key in response data to iterate over (required for url string). itermode: Iteration mode (required for url string) (one of `cursor`, `page` or `timeline`) minimum_time: Minimum elapsed time (in seconds) between two calls to the Slack API (default to 0). If not reached the client will sleep for the remaining time. as_json: Post JSON to the slack API Returns: Async iterator over `response_data[key]` """ itervalue = None if not data: data = {} last_request_time = None while True: current_time = time.time() if ( minimum_time and last_request_time and last_request_time + minimum_time > current_time ): await self.sleep(last_request_time + minimum_time - current_time) data, iterkey, itermode = sansio.prepare_iter_request( url, data, iterkey=iterkey, itermode=itermode, limit=limit, itervalue=itervalue, ) last_request_time = time.time() response_data = await self.query(url, data, headers, as_json) itervalue = sansio.decode_iter_request(response_data) for item in response_data[iterkey]: yield item if not itervalue: break
[docs] async def rtm( self, url: Optional[str] = None, bot_id: Optional[str] = None ) -> AsyncIterator[events.Event]: """ Iterate over event from the RTM API Args: url: Websocket connection url bot_id: Connecting bot ID Returns: :class:`slack.events.Event` or :class:`slack.events.Message` """ while True: bot_id = bot_id or await self._find_bot_id() url = url or await self._find_rtm_url() async for event in self._incoming_from_rtm(url, bot_id): yield event url = None
async def _find_bot_id(self) -> str: """ Find the bot ID to discard incoming message from the bot itself. Returns: The bot ID """ auth = await self.query(methods.AUTH_TEST) user_info = await self.query(methods.USERS_INFO, {"user": auth["user_id"]}) bot_id = user_info["user"]["profile"]["bot_id"] LOG.info("BOT_ID is %s", bot_id) return bot_id async def _find_rtm_url(self) -> str: """ Call `rtm.connect` to find the websocket url. Returns: Url for websocket connection """ response = await self.query(methods.RTM_CONNECT) return response["url"] async def _incoming_from_rtm( self, url: str, bot_id: str ) -> AsyncIterator[events.Event]: """ Connect and discard incoming RTM event if necessary. :param url: Websocket url :param bot_id: Bot ID :return: Incoming events """ async for data in self._rtm(url): event = events.Event.from_rtm(json.loads(data)) if sansio.need_reconnect(event): break elif sansio.discard_event(event, bot_id): continue else: yield event