#PlChat is a pleroma chat client # Copyright (C) 2021 Knott Eye # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import re, requests, os.path, websockets, json class Account(): def __init__(self, instance, username=' ', password='', clientID=None, clientSecret=None, token=None, refresh_token=None, totpFunc=None): scheme = re.compile('https?://') self.instance = re.sub(scheme, '', instance) if(self.instance[len(self.instance) - 1] == '/'): self.instance = self.instance[0:len(self.instance) - 1] self.url = 'https://'+self.instance if(username[0] == '@'): self.username = username[1:] else: self.username = username self.password = password self.clientID = clientID self.clientSecret = clientSecret self.token = '' self.refresh_token = refresh_token if token: self.token = token self._chatFlag = False self._instanceInfo = None self.flakeid = None self.acct = None self.totpFunc = totpFunc self.chat_update = None # start streaming chat_updates async def startStreaming(self, chat_update=None): if not self.token: return False if chat_update: self.chat_update = chat_update uri = "wss://"+self.instance+'/api/v1/streaming?access_token='+self.token+'&stream=user:pleroma_chat' async with websockets.connect(uri) as websocket: while True: res = json.loads(await websocket.recv()) if self.chat_update: self.chat_update(self, res) # set streaming event handler def setChatUpdate(self, chat_update): self.chat_update = chat_update # register app if we don't have one def register(self): if not self.clientID or not self.clientSecret: request_data = { 'client_name': "plchat", 'scopes': "read write follow push", 'redirect_uris': "urn:ietf:wg:oauth:2.0:oob" } response = self.apiRequest('POST', '/api/v1/apps', request_data) self.clientID, self.clientSecret = (response['client_id'], response['client_secret']) return self.clientID, self.clientSecret # get oauth token def getToken(self): request_data = { 'grant_type': 'password', 'username': self.username, 'password': self.password, 'client_id': self.clientID, 'client_secret': self.clientSecret, 'scopes': "read write follow push", 'redirect_uris': "urn:ietf:wg:oauth:2.0:oob" } response = self.apiRequest('POST', '/oauth/token', request_data) if 'error' in response and response['error'] == 'mfa_required': if response['supported_challenge_types'] == 'totp' or 'totp' in response['supported_challenge_types']: mfa_code, code_type = self.totpFunc(self, response['supported_challenge_types']) response = self.apiRequest('POST', '/oauth/mfa/challenge', { 'client_id': self.clientID, 'client_secret': self.clientSecret, 'mfa_token': response['mfa_token'], 'challenge_type': code_type, 'code': mfa_code }) elif 'error' in response: raise ValueError self.token = response['access_token'] self.refresh_token = response['refresh_token'] r = self.apiRequest('GET', '/api/v1/accounts/verify_credentials') self.flakeid = r['id'] self.acct = r return response # Ensure we have have some credentials def login(self): r = self.apiRequest('GET', '/api/v1/accounts/verify_credentials') if 'id' in r: self.flakeid = r['id'] self.acct = r if 'error' in r: if not self.refresh_token: return self.getToken() else: r = self.apiRequest('POST', '/oauth/token', { 'grant_type': 'refresh_token', 'refresh_token': self.refresh_token, 'client_id': self.clientID, 'client_secret': self.clientSecret, 'scopes': "read write follow push", 'redirect_uris': "urn:ietf:wg:oauth:2.0:oob" }) self.token = r['access_token'] self.refresh_token = r['refresh_token'] r = self.apiRequest('GET', '/api/v1/accounts/verify_credentials') self.acct = r self.flakeid = r['id'] return r return {'access_token': self.token, 'refresh_token': self.refresh_token} #pass True to always get new info def getInstanceInfo(self, f=False): if f or not self._instanceInfo: r = self.apiRequest('GET', '/api/v1/instance') self._instanceInfo = r return r else: return self._instanceInfo # enter an account name (eg lain@lain.com) get back a flakeid # get back None if we couldn't find it def getAcctID(self, account): response = self.apiRequest('GET', '/api/v1/accounts/search', {'q': account, 'limit':1, 'resolve': True}) for acct in response: if acct['acct'] == account: return acct['id'] return None # enter an account name (eg lain@lain.com) get back all account info # get back None if we couldn't find it def getAcctInfo(self, account): response = self.apiRequest('GET', '/api/v1/accounts/search', {'q': account, 'limit':1, 'resolve': True}) for acct in response: if acct['acct'] == account: return acct return None # enter a flakeid, get back a chat. will be created if not existing def addChat(self, flakeid): r = self.apiRequest('POST', '/api/v1/pleroma/chats/by-account-id/'+flakeid) return r # get a chat # this is the *chat* id, not the flakeid def getChat(self, cid): return self.apiRequest('GET', '/api/v1/pleroma/chats/'+cid) # optional page parameter, index of 0 # limit is number of items per page # if the server is too old to support pagination you will get back a list of all chats def listChats(self, page=0, limit=20, with_muted=True): page *= limit if self._chatFlag: r = self.apiRequest('GET', '/api/v1/pleroma/chats') return r r = self.apiRequest('GET', '/api/v2/pleroma/chats', {'with_muted': with_muted, 'offset': page, 'limit': limit}) if 'error' in r and r['error'] == 'Not implemented': self._chatFlag = True return self.listChats(page, limit) return r # list pages in chat, # optional page parameter, index of 0 # limit is number of items per page def getMessages(self, cid, page=0, limit=20, past=None): page *= limit if past: return self.apiRequest('GET', '/api/v1/pleroma/chats/'+cid+'/messages', {'offset': page, 'limit': limit, 'max_id': past}) return self.apiRequest('GET', '/api/v1/pleroma/chats/'+cid+'/messages', {'offset': page, 'limit': limit}) # send a message to specified chat # either conent or media id must be specified def sendMessage(self, cid, content=None, media=None): if not content and not media: return 'You fucked up.' r = self.apiRequest('POST', '/api/v1/pleroma/chats/'+cid+'/messages', {'content': content, 'media_id': media}) return r # upload a piece of media, get back an object describing it # the id attribute can be used with sendMessage() def uploadMedia(self, file): with open(file, 'rb') as f: r = self.apiRequest('POST', '/api/v1/media', None, files={'file': f}) return r # mark all messages in a chat (up to last_read_id) as read def markChatRead(self, cid, last_read_id): return self.apiRequest('POST', '/api/v1/pleroma/chats/'+cid+'/read', {'last_read_id': last_read_id}) # enter a chat it and a message id, get back the deleted message on success def deleteMessage(self, cid, mid): return self.apiRequest('DELETE', '/api/v1/pleroma/chats/'+cid+'/messages/'+mid) # internal function def apiRequest(self, method, route, data=None, **kwargs): if method == 'GET': response = requests.get(self.url + route, data, timeout=300, headers={'Authorization': 'Bearer '+self.token}, **kwargs) response = response.json() return response if method == 'POST': response = requests.post(self.url + route, data, timeout=300, headers={'Authorization': 'Bearer '+self.token}, **kwargs) response = response.json() return response if method == 'DELETE': response = requests.delete(self.url + route, data, timeout=300, headers={'Authorization': 'Bearer '+self.token}, **kwargs) response = response.json() return response