Files
snikket-web-portal/snikket_web/xmpputil.py
Jonas Schäfer 12276337c1 Partially log requests sent to the API
Payloads containing sensitive content (such as passwords and
tokens) should be hidden.
2020-04-30 16:14:14 +02:00

244 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import binascii
import typing
import xml.etree.ElementTree as ET
from quart import abort
import quart.exceptions
TAG_XMPP_ERROR = "error"
NS_XMPP_ERROR_CONDITION = "urn:ietf:params:xml:ns:xmpp-stanzas"
TAG_XMPP_ERROR_ITEM_NOT_FOUND = "{{{}}}item-not-found".format(NS_XMPP_ERROR_CONDITION)
TAG_XMPP_ERROR_TEXT = "{{{}}}text".format(NS_XMPP_ERROR_CONDITION)
ERROR_CODE_MAP = {
TAG_XMPP_ERROR_ITEM_NOT_FOUND: 404,
}
NS_PUBSUB = "http://jabber.org/protocol/pubsub"
TAG_PUBSUB = "{{{}}}pubsub".format(NS_PUBSUB)
TAG_PUBSUB_ITEM = "{{{}}}item".format(NS_PUBSUB)
TAG_PUBSUB_ITEMS = "{{{}}}items".format(NS_PUBSUB)
NS_USER_NICKNAME = "http://jabber.org/protocol/nick"
NODE_USER_NICKNAME = NS_USER_NICKNAME
TAG_USER_NICKNAME_NICK = "{{{}}}nick".format(NS_USER_NICKNAME)
NODE_USER_AVATAR_METADATA = "urn:xmpp:avatar:metadata"
NS_USER_AVATAR_METADATA = "urn:xmpp:avatar:metadata"
TAG_USER_AVATAR_METADATA = "{{{}}}metadata".format(NS_USER_AVATAR_METADATA)
TAG_USER_AVATAR_METADATA_INFO = "{{{}}}info".format(NS_USER_AVATAR_METADATA)
NODE_USER_AVATAR_DATA = "urn:xmpp:avatar:data"
NS_USER_AVATAR_DATA = "urn:xmpp:avatar:data"
TAG_USER_AVATAR_DATA = "{{{}}}data".format(NS_USER_AVATAR_DATA)
def split_jid(s):
bare, sep, resource = s.partition("/")
if not sep:
resource = None
localpart, sep, domain = bare.partition("@")
if not sep:
domain = localpart
localpart = None
return localpart, domain, resource
def raise_iq_error(err: ET.Element):
err_condition_el = None
err_text_el = None
err_app_def_condition_el = None
for el in err:
if el.tag == TAG_XMPP_ERROR_TEXT:
err_text_el = el
elif el.tag.startswith("{{{}}}".format(NS_XMPP_ERROR_CONDITION)):
err_condition_el = el
else:
err_app_def_condition_el = el
print(err_text_el, err_condition_el, err_app_def_condition_el)
abort(ERROR_CODE_MAP.get(err_condition_el.tag, 500),
err_condition_el.tag)
def extract_iq_reply(tree: ET.Element,
require_tag: str = None) -> typing.Optional[ET.Element]:
iq_type = tree.get("type")
if iq_type == "error":
error = tree.find(TAG_XMPP_ERROR)
if error is not None:
raise raise_iq_error(error)
raise abort(500, "malformed reply")
elif iq_type == "result":
if len(tree) > 0:
reply_el = tree[0]
if require_tag and reply_el.tag != require_tag:
raise abort(500, "unexpected reply")
return reply_el
if require_tag:
raise abort(500, "unexpected reply")
return None
else:
raise abort(500, "unsupported reply")
def make_password_change_request(jid, password):
username, domain, _ = split_jid(jid)
# XXX: this is due to a problem with mod_rest / mod_register in prosody:
# it doesnt recognize the password change stanza unless we send it to
# the account JID.
req = ET.Element("iq", to="{}@{}".format(username, domain), type="set")
q = ET.SubElement(req, "query", xmlns="jabber:iq:register")
ET.SubElement(q, "username").text = username
ET.SubElement(q, "password").text = password
return req
def make_pubsub_item_put_request(to, node, id_=None):
req = ET.Element("iq", type="set", to=to)
q = ET.SubElement(req, "pubsub", xmlns=NS_PUBSUB)
publish = ET.SubElement(q, "publish", node=node)
item = ET.SubElement(publish, "item")
if id_ is not None:
item.set("id", id_)
return req, item
def make_nickname_set_request(to, nickname):
req, item = make_pubsub_item_put_request(
to,
NODE_USER_NICKNAME,
)
ET.SubElement(item, "nick", xmlns=NS_USER_NICKNAME).text = nickname
return req
def make_pubsub_item_request(to, node, id_=None):
req = ET.Element("iq", type="get", to=to)
q = ET.SubElement(req, "pubsub", xmlns=NS_PUBSUB)
items = ET.SubElement(q, "items", node=node)
if id_ is not None:
ET.SubElement(items, "item", id=id_)
else:
items.set("max_items", "1")
return req
def make_nickname_get_request(to):
return make_pubsub_item_request(to, NODE_USER_NICKNAME)
def make_avatar_metadata_request(to):
return make_pubsub_item_request(to, NODE_USER_AVATAR_METADATA)
def make_avatar_data_request(to, sha1):
return make_pubsub_item_request(to, NODE_USER_AVATAR_DATA, id_=sha1)
def make_avatar_data_set_request(to, data, id_):
req, item = make_pubsub_item_put_request(
to,
NODE_USER_AVATAR_DATA,
id_=id_,
)
ET.SubElement(item, "data", xmlns=NS_USER_AVATAR_DATA).text = \
base64.b64encode(data).decode("ascii")
return req
def make_avatar_metadata_set_request(to, mimetype: str, id_: str, size: int,
width: int = None,
height: int = None):
req, item = make_pubsub_item_put_request(
to,
NODE_USER_AVATAR_METADATA,
id_=id_,
)
metadata_wrap = ET.SubElement(
item,
"metadata", xmlns=NS_USER_AVATAR_METADATA)
attr = {
"id": id_,
"bytes": str(size),
"type": mimetype,
}
if width is not None:
attr["width"] = str(width)
if height is not None:
attr["height"] = str(height)
ET.SubElement(metadata_wrap, "info", xmlns=NS_USER_AVATAR_METADATA, **attr)
return req
def _require_child(t: ET.Element, tag: str) -> ET.Element:
el = t.find(tag)
if el is None:
raise abort(500, "malformed reply")
return el
def extract_pubsub_item_get_reply(iq_tree, payload_tag):
try:
pubsub = extract_iq_reply(iq_tree, TAG_PUBSUB)
except quart.exceptions.NotFound:
return None
items = _require_child(pubsub, TAG_PUBSUB_ITEMS)
if len(items) == 0:
return None
return _require_child(_require_child(items, TAG_PUBSUB_ITEM), payload_tag)
def extract_nickname_get_reply(iq_tree):
nick = extract_pubsub_item_get_reply(iq_tree, TAG_USER_NICKNAME_NICK)
if nick is None:
return None
return nick.text
def extract_avatar_metadata_get_reply(iq_tree):
metadata = extract_pubsub_item_get_reply(iq_tree, TAG_USER_AVATAR_METADATA)
if metadata is None:
return None
if len(metadata) != 1 or metadata[0].tag != TAG_USER_AVATAR_METADATA_INFO:
# raise an error instead?
return None
info = metadata[0]
attrs = info.attrib
result = {
"sha1": attrs["id"],
"type": attrs.get("type", "image/png"),
}
def extract_optional(key, type_=int):
try:
result[key] = type_(attrs[key])
except (KeyError, ValueError, TypeError):
pass
extract_optional("width")
extract_optional("height")
extract_optional("bytes")
return result
def extract_avatar_data_get_reply(iq_tree):
data = extract_pubsub_item_get_reply(iq_tree, TAG_USER_AVATAR_DATA)
if data.text is None:
return None
return base64.b64decode(data.text)