plchat/pleroma.py

226 lines
8.2 KiB
Python

#PlChat is a pleroma chat client
# Copyright (C) 2021 Knott Eye
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import re, requests, os.path, websockets, json
class Account():
def __init__(self, instance, username=' ', password='', clientID=None, clientSecret=None, token=None, refresh_token=None, totpFunc=None):
scheme = re.compile('https?://')
self.instance = re.sub(scheme, '', instance)
if(self.instance[len(self.instance) - 1] == '/'):
self.instance = self.instance[0:len(self.instance) - 1]
self.url = 'https://'+self.instance
if(username[0] == '@'):
self.username = username[1:]
else:
self.username = username
self.password = password
self.clientID = clientID
self.clientSecret = clientSecret
self.token = ''
self.refresh_token = refresh_token
if token:
self.token = token
self._chatFlag = False
self._instanceInfo = None
self.flakeid = None
self.acct = None
self.totpFunc = totpFunc
self.chat_update = None
# start streaming chat_updates
async def startStreaming(self, chat_update=None):
if not self.token:
return False
if chat_update:
self.chat_update = chat_update
uri = "wss://"+self.instance+'/api/v1/streaming?access_token='+self.token+'&stream=user:pleroma_chat'
async with websockets.connect(uri) as websocket:
while True:
res = json.loads(await websocket.recv())
if self.chat_update:
self.chat_update(self, res)
# set streaming event handler
def setChatUpdate(self, chat_update):
self.chat_update = chat_update
# register app if we don't have one
def register(self):
if not self.clientID or not self.clientSecret:
request_data = {
'client_name': "plchat",
'scopes': "read write follow push",
'redirect_uris': "urn:ietf:wg:oauth:2.0:oob"
}
response = self.apiRequest('POST', '/api/v1/apps', request_data)
self.clientID, self.clientSecret = (response['client_id'], response['client_secret'])
return self.clientID, self.clientSecret
# get oauth token
def getToken(self):
request_data = {
'grant_type': 'password',
'username': self.username,
'password': self.password,
'client_id': self.clientID,
'client_secret': self.clientSecret,
'scopes': "read write follow push",
'redirect_uris': "urn:ietf:wg:oauth:2.0:oob"
}
response = self.apiRequest('POST', '/oauth/token', request_data)
if 'error' in response and response['error'] == 'mfa_required':
if response['supported_challenge_types'] == 'totp' or 'totp' in response['supported_challenge_types']:
mfa_code, code_type = self.totpFunc(self, response['supported_challenge_types'])
response = self.apiRequest('POST', '/oauth/mfa/challenge', {
'client_id': self.clientID,
'client_secret': self.clientSecret,
'mfa_token': response['mfa_token'],
'challenge_type': code_type,
'code': mfa_code
})
self.token = response['access_token']
self.refresh_token = response['refresh_token']
r = self.apiRequest('GET', '/api/v1/accounts/verify_credentials')
self.flakeid = r['id']
self.acct = r
return response
# Ensure we have have some credentials
def login(self):
r = self.apiRequest('GET', '/api/v1/accounts/verify_credentials')
if 'id' in r:
self.flakeid = r['id']
self.acct = r
if 'error' in r:
if not self.refresh_token:
return self.getToken()
else:
r = self.apiRequest('POST', '/oauth/token', {
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
'client_id': self.clientID,
'client_secret': self.clientSecret,
'scopes': "read write follow push",
'redirect_uris': "urn:ietf:wg:oauth:2.0:oob"
})
self.token = r['access_token']
self.refresh_token = r['refresh_token']
r = self.apiRequest('GET', '/api/v1/accounts/verify_credentials')
self.acct = r
self.flakeid = r['id']
return r
return {'access_token': self.token, 'refresh_token': self.refresh_token}
#pass True to always get new info
def getInstanceInfo(self, f=False):
if f or not self._instanceInfo:
r = self.apiRequest('GET', '/api/v1/instance')
self._instanceInfo = r
return r
else:
return self._instanceInfo
# enter an account name (eg lain@lain.com) get back a flakeid
# get back None if we couldn't find it
def getAcctID(self, account):
response = self.apiRequest('GET', '/api/v1/accounts/search', {'q': account, 'limit':1, 'resolve': True})
for acct in response:
if acct['acct'] == account:
return acct['id']
return None
# enter an account name (eg lain@lain.com) get back all account info
# get back None if we couldn't find it
def getAcctInfo(self, account):
response = self.apiRequest('GET', '/api/v1/accounts/search', {'q': account, 'limit':1, 'resolve': True})
for acct in response:
if acct['acct'] == account:
return acct
return None
# enter a flakeid, get back a chat. will be created if not existing
def addChat(self, flakeid):
r = self.apiRequest('POST', '/api/v1/pleroma/chats/by-account-id/'+flakeid)
return r
# get a chat
# this is the *chat* id, not the flakeid
def getChat(self, cid):
return self.apiRequest('GET', '/api/v1/pleroma/chats/'+cid)
# optional page parameter, index of 0
# limit is number of items per page
# if the server is too old to support pagination you will get back a list of all chats
def listChats(self, page=0, limit=20, with_muted=True):
page *= limit
if self._chatFlag:
r = self.apiRequest('GET', '/api/v1/pleroma/chats')
return r
r = self.apiRequest('GET', '/api/v2/pleroma/chats', {'with_muted': with_muted, 'offset': page, 'limit': limit})
if 'error' in r and r['error'] == 'Not implemented':
self._chatFlag = True
return self.listChats(page, limit)
return r
# list pages in chat,
# optional page parameter, index of 0
# limit is number of items per page
def getMessages(self, cid, page=0, limit=20, past=None):
page *= limit
if past:
return self.apiRequest('GET', '/api/v1/pleroma/chats/'+cid+'/messages', {'offset': page, 'limit': limit, 'max_id': past})
return self.apiRequest('GET', '/api/v1/pleroma/chats/'+cid+'/messages', {'offset': page, 'limit': limit})
# send a message to specified chat
# either conent or media id must be specified
def sendMessage(self, cid, content=None, media=None):
if not content and not media:
return 'You fucked up.'
r = self.apiRequest('POST', '/api/v1/pleroma/chats/'+cid+'/messages', {'content': content, 'media_id': media})
return r
# upload a piece of media, get back an object describing it
# the id attribute can be used with sendMessage()
def uploadMedia(self, file):
with open(file, 'rb') as f:
r = self.apiRequest('POST', '/api/v1/media', None, files={'file': f})
return r
# mark all messages in a chat (up to last_read_id) as read
def markChatRead(self, cid, last_read_id):
return self.apiRequest('POST', '/api/v1/pleroma/chats/'+cid+'/read', {'last_read_id': last_read_id})
# enter a chat it and a message id, get back the deleted message on success
def deleteMessage(self, cid, mid):
return self.apiRequest('DELETE', '/api/v1/pleroma/chats/'+cid+'/messages/'+mid)
# internal function
def apiRequest(self, method, route, data=None, **kwargs):
if method == 'GET':
response = requests.get(self.url + route, data, timeout=300, headers={'Authorization': 'Bearer '+self.token}, **kwargs)
response = response.json()
return response
if method == 'POST':
response = requests.post(self.url + route, data, timeout=300, headers={'Authorization': 'Bearer '+self.token}, **kwargs)
response = response.json()
return response
if method == 'DELETE':
response = requests.delete(self.url + route, data, timeout=300, headers={'Authorization': 'Bearer '+self.token}, **kwargs)
response = response.json()
return response