227 lines
8.5 KiB
Python
227 lines
8.5 KiB
Python
################################################################################
|
|
# BSD 2-Clause License
|
|
#
|
|
# Copyright (c) 2021, Knott Eye
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright notice, this
|
|
# list of conditions and the following disclaimer.
|
|
#
|
|
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions and the following disclaimer in the documentation
|
|
# and/or other materials provided with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
|
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
|
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
|
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
################################################################################
|
|
import re, requests, os.path, websockets, json
|
|
|
|
class Account():
|
|
def __init__(self, instance, username=' ', password='', clientID=None, clientSecret=None, token=None, refresh_token=None):
|
|
scheme = re.compile('https?://')
|
|
self.instance = re.sub(scheme, '', instance)
|
|
if(self.instance[len(self.instance) - 1] == '/'):
|
|
self.instance = self.instance[0:len(self.instance) - 1]
|
|
self.url = 'https://'+self.instance
|
|
if(username[0] == '@'):
|
|
self.username = username[1:]
|
|
else:
|
|
self.username = username
|
|
|
|
self.password = password
|
|
|
|
self.clientID = clientID
|
|
self.clientSecret = clientSecret
|
|
self.token = ''
|
|
self.refresh_token = refresh_token
|
|
if token:
|
|
self.token = token
|
|
self._chatFlag = False
|
|
self._instanceInfo = None
|
|
self.flakeid = None
|
|
self.acct = None
|
|
|
|
self.chat_update = None
|
|
|
|
# start streaming chat_updates
|
|
async def startStreaming(self, chat_update=None):
|
|
if not self.token:
|
|
return False
|
|
if chat_update:
|
|
self.chat_update = chat_update
|
|
uri = "wss://"+self.instance+'/api/v1/streaming?access_token='+self.token+'&stream=user:pleroma_chat'
|
|
async with websockets.connect(uri) as websocket:
|
|
while True:
|
|
res = json.loads(await websocket.recv())
|
|
if self.chat_update:
|
|
self.chat_update(self, res)
|
|
|
|
# set streaming event handler
|
|
def setChatUpdate(self, chat_update):
|
|
self.chat_update = chat_update
|
|
|
|
# register app if we don't have one
|
|
def register(self):
|
|
if not self.clientID or not self.clientSecret:
|
|
request_data = {
|
|
'client_name': "plchat",
|
|
'scopes': "read write follow push",
|
|
'redirect_uris': "urn:ietf:wg:oauth:2.0:oob"
|
|
}
|
|
response = self.apiRequest('POST', '/api/v1/apps', request_data)
|
|
self.clientID, self.clientSecret = (response['client_id'], response['client_secret'])
|
|
return self.clientID, self.clientSecret
|
|
|
|
# get oauth token
|
|
def getToken(self):
|
|
request_data = {
|
|
'grant_type': 'password',
|
|
'username': self.username,
|
|
'password': self.password,
|
|
'client_id': self.clientID,
|
|
'client_secret': self.clientSecret,
|
|
'scopes': "read write follow push",
|
|
'redirect_uris': "urn:ietf:wg:oauth:2.0:oob"
|
|
}
|
|
response = self.apiRequest('POST', '/oauth/token', request_data)
|
|
self.token = response['access_token']
|
|
self.refresh_token = response['refresh_token']
|
|
r = self.apiRequest('GET', '/api/v1/accounts/verify_credentials')
|
|
self.flakeid = r['id']
|
|
self.acct = r
|
|
return response
|
|
|
|
# Ensure we have have some credentials
|
|
def login(self):
|
|
r = self.apiRequest('GET', '/api/v1/accounts/verify_credentials')
|
|
if 'id' in r:
|
|
self.flakeid = r['id']
|
|
self.acct = r
|
|
if 'error' in r:
|
|
if not self.refresh_token:
|
|
return self.getToken()
|
|
else:
|
|
r = self.apiRequest('POST', '/oauth/token', {
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': self.refresh_token,
|
|
'client_id': self.clientID,
|
|
'client_secret': self.clientSecret,
|
|
'scopes': "read write follow push",
|
|
'redirect_uris': "urn:ietf:wg:oauth:2.0:oob"
|
|
})
|
|
self.token = r['access_token']
|
|
self.refresh_token = r['refresh_token']
|
|
r = self.apiRequest('GET', '/api/v1/accounts/verify_credentials')
|
|
self.acct = r
|
|
self.flakeid = r['id']
|
|
return r
|
|
return {'access_token': self.token, 'refresh_token': self.refresh_token}
|
|
|
|
#pass True to always get new info
|
|
def getInstanceInfo(self, f=False):
|
|
if f or not self._instanceInfo:
|
|
r = self.apiRequest('GET', '/api/v1/instance')
|
|
self._instanceInfo = r
|
|
return r
|
|
else:
|
|
return self._instanceInfo
|
|
|
|
# enter an account name (eg lain@lain.com) get back a flakeid
|
|
# get back None if we couldn't find it
|
|
def getAcctID(self, account):
|
|
response = self.apiRequest('GET', '/api/v1/accounts/search', {'q': account, 'limit':1, 'resolve': True})
|
|
for acct in response:
|
|
if acct['acct'] == account:
|
|
return acct['id']
|
|
return None
|
|
|
|
# enter an account name (eg lain@lain.com) get back all account info
|
|
# get back None if we couldn't find it
|
|
def getAcctInfo(self, account):
|
|
response = self.apiRequest('GET', '/api/v1/accounts/search', {'q': account, 'limit':1, 'resolve': True})
|
|
for acct in response:
|
|
if acct['acct'] == account:
|
|
return acct
|
|
return None
|
|
|
|
# enter a flakeid, get back a chat. will be created if not existing
|
|
def addChat(self, flakeid):
|
|
r = self.apiRequest('POST', '/api/v1/pleroma/chats/by-account-id/'+flakeid)
|
|
return r
|
|
|
|
# get a chat
|
|
# this is the *chat* id, not the flakeid
|
|
def getChat(self, cid):
|
|
return self.apiRequest('GET', '/api/v1/pleroma/chats/'+cid)
|
|
|
|
# optional page parameter, index of 0
|
|
# limit is number of items per page
|
|
# if the server is too old to support pagination you will get back a list of all chats
|
|
def listChats(self, page=0, limit=20, with_muted=True):
|
|
page *= limit
|
|
if self._chatFlag:
|
|
r = self.apiRequest('GET', '/api/v1/pleroma/chats')
|
|
return r
|
|
r = self.apiRequest('GET', '/api/v2/pleroma/chats', {'with_muted': with_muted, 'offset': page, 'limit': limit})
|
|
if 'error' in r and r['error'] == 'Not implemented':
|
|
self._chatFlag = True
|
|
return self.listChats(page, limit)
|
|
return r
|
|
|
|
# list pages in chat,
|
|
# optional page parameter, index of 0
|
|
# limit is number of items per page
|
|
def getMessages(self, cid, page=0, limit=20, past=None):
|
|
page *= limit
|
|
if past:
|
|
return self.apiRequest('GET', '/api/v1/pleroma/chats/'+cid+'/messages', {'offset': page, 'limit': limit, 'max_id': past})
|
|
return self.apiRequest('GET', '/api/v1/pleroma/chats/'+cid+'/messages', {'offset': page, 'limit': limit})
|
|
|
|
# send a message to specified chat
|
|
# either conent or media id must be specified
|
|
def sendMessage(self, cid, content=None, media=None):
|
|
if not content and not media:
|
|
return 'You fucked up.'
|
|
r = self.apiRequest('POST', '/api/v1/pleroma/chats/'+cid+'/messages', {'content': content, 'media_id': media})
|
|
return r
|
|
|
|
# upload a piece of media, get back an object describing it
|
|
# the id attribute can be used with sendMessage()
|
|
def uploadMedia(self, file):
|
|
with open(file, 'rb') as f:
|
|
r = self.apiRequest('POST', '/api/v1/media', None, files={'file': f})
|
|
return r
|
|
|
|
# mark all messages in a chat (up to last_read_id) as read
|
|
def markChatRead(self, cid, last_read_id):
|
|
return self.apiRequest('POST', '/api/v1/pleroma/chats/'+cid+'/read', {'last_read_id': last_read_id})
|
|
|
|
# enter a chat it and a message id, get back the deleted message on success
|
|
def deleteMessage(self, cid, mid):
|
|
return self.apiRequest('DELETE', '/api/v1/pleroma/chats/'+cid+'/messages/'+mid)
|
|
|
|
# internal function
|
|
def apiRequest(self, method, route, data=None, **kwargs):
|
|
if method == 'GET':
|
|
response = requests.get(self.url + route, data, timeout=300, headers={'Authorization': 'Bearer '+self.token}, **kwargs)
|
|
response = response.json()
|
|
return response
|
|
if method == 'POST':
|
|
response = requests.post(self.url + route, data, timeout=300, headers={'Authorization': 'Bearer '+self.token}, **kwargs)
|
|
response = response.json()
|
|
return response
|
|
if method == 'DELETE':
|
|
response = requests.delete(self.url + route, data, timeout=300, headers={'Authorization': 'Bearer '+self.token}, **kwargs)
|
|
response = response.json()
|
|
return response |