You've already forked snikket-web-portal
This is needed because jinja2 had an update which caused the portal to
not work at all:
```
ImportError: cannot import name 'escape' from 'jinja2'
```
Quart needed updating for that.
This update required a lot of typefixes. Apparently, the "canned"
responses (like redirect) are now plain werkzeug responses, while
quart.Response does not inherit from werkzeug.Response (otherwise, we
could've changed the type annotations to werkzeug.Response everywhere,
but that doesn't work because a quart.Response is not a
werkzeug.Response).
P.S.: This time, I *did* check that avatar uploads don't break (see
b007afc).
374 lines
11 KiB
Python
374 lines
11 KiB
Python
import base64
|
||
import typing
|
||
|
||
import xml.etree.ElementTree as ET
|
||
|
||
from quart import abort
|
||
import werkzeug.exceptions
|
||
|
||
|
||
TAG_XMPP_ERROR = "error"
|
||
|
||
NS_XMPP_ERROR_CONDITION = "urn:ietf:params:xml:ns:xmpp-stanzas"
|
||
TAG_XMPP_ERROR_ITEM_NOT_FOUND = \
|
||
"{{{}}}item-not-found".format(NS_XMPP_ERROR_CONDITION)
|
||
TAG_XMPP_ERROR_TEXT = "{{{}}}text".format(NS_XMPP_ERROR_CONDITION)
|
||
|
||
ERROR_CODE_MAP = {
|
||
TAG_XMPP_ERROR_ITEM_NOT_FOUND: 404,
|
||
}
|
||
|
||
NS_PUBSUB = "http://jabber.org/protocol/pubsub"
|
||
NS_PUBSUB_OWNER = "http://jabber.org/protocol/pubsub#owner"
|
||
TAG_PUBSUB = "{{{}}}pubsub".format(NS_PUBSUB)
|
||
TAG_PUBSUB_OWNER = "{{{}}}pubsub".format(NS_PUBSUB_OWNER)
|
||
TAG_PUBSUB_ITEM = "{{{}}}item".format(NS_PUBSUB)
|
||
TAG_PUBSUB_ITEMS = "{{{}}}items".format(NS_PUBSUB)
|
||
TAG_PUBSUB_CONFIGURE = "{{{}}}configure".format(NS_PUBSUB_OWNER)
|
||
|
||
NS_USER_NICKNAME = "http://jabber.org/protocol/nick"
|
||
NODE_USER_NICKNAME = NS_USER_NICKNAME
|
||
TAG_USER_NICKNAME_NICK = "{{{}}}nick".format(NS_USER_NICKNAME)
|
||
|
||
NODE_USER_AVATAR_METADATA = "urn:xmpp:avatar:metadata"
|
||
NS_USER_AVATAR_METADATA = "urn:xmpp:avatar:metadata"
|
||
TAG_USER_AVATAR_METADATA = "{{{}}}metadata".format(NS_USER_AVATAR_METADATA)
|
||
TAG_USER_AVATAR_METADATA_INFO = "{{{}}}info".format(NS_USER_AVATAR_METADATA)
|
||
|
||
NODE_USER_AVATAR_DATA = "urn:xmpp:avatar:data"
|
||
NS_USER_AVATAR_DATA = "urn:xmpp:avatar:data"
|
||
TAG_USER_AVATAR_DATA = "{{{}}}data".format(NS_USER_AVATAR_DATA)
|
||
|
||
NODE_VCARD = "urn:xmpp:vcard4"
|
||
|
||
NS_DATA_FORM = "jabber:x:data"
|
||
TAG_DATA_FORM_X = "{{{}}}x".format(NS_DATA_FORM)
|
||
TAG_DATA_FORM_FIELD = "{{{}}}field".format(NS_DATA_FORM)
|
||
TAG_DATA_FORM_VALUE = "{{{}}}value".format(NS_DATA_FORM)
|
||
|
||
FORM_NODE_CONFIG = "http://jabber.org/protocol/pubsub#node_config"
|
||
FORM_FIELD_PUBSUB_ACCESS_MODEL = "pubsub#access_model"
|
||
|
||
|
||
SimpleJID = typing.Tuple[typing.Optional[str], str, typing.Optional[str]]
|
||
T = typing.TypeVar("T")
|
||
|
||
|
||
def split_jid(s: str) -> SimpleJID:
|
||
resource: typing.Optional[str]
|
||
localpart: typing.Optional[str]
|
||
bare, sep, resource = s.partition("/")
|
||
if not sep:
|
||
resource = None
|
||
localpart, sep, domain = bare.partition("@")
|
||
if not sep:
|
||
domain = localpart
|
||
localpart = None
|
||
return localpart, domain, resource
|
||
|
||
|
||
def raise_iq_error(err: ET.Element) -> None:
|
||
err_condition_el = None
|
||
# err_text_el = None
|
||
# err_app_def_condition_el = None
|
||
|
||
for el in err:
|
||
if el.tag == TAG_XMPP_ERROR_TEXT:
|
||
# err_text_el = el
|
||
continue
|
||
elif el.tag.startswith("{{{}}}".format(NS_XMPP_ERROR_CONDITION)):
|
||
err_condition_el = el
|
||
# else:
|
||
# err_app_def_condition_el = el
|
||
|
||
if err_condition_el is None:
|
||
condition_tag = "undefined-condition"
|
||
else:
|
||
condition_tag = err_condition_el.tag
|
||
|
||
# print(err_text_el, err_condition_el, err_app_def_condition_el)
|
||
abort(ERROR_CODE_MAP.get(condition_tag, 500), condition_tag)
|
||
|
||
|
||
def extract_iq_reply(
|
||
tree: ET.Element,
|
||
require_tag: typing.Optional[str] = None,
|
||
) -> typing.Optional[ET.Element]:
|
||
iq_type = tree.get("type")
|
||
if iq_type == "error":
|
||
error = tree.find(TAG_XMPP_ERROR)
|
||
if error is not None:
|
||
raise_iq_error(error)
|
||
raise abort(500, "malformed reply")
|
||
elif iq_type == "result":
|
||
if len(tree) > 0:
|
||
reply_el = tree[0]
|
||
if require_tag and reply_el.tag != require_tag:
|
||
raise abort(500, "unexpected reply")
|
||
return reply_el
|
||
if require_tag:
|
||
raise abort(500, "unexpected reply")
|
||
return None
|
||
else:
|
||
raise abort(500, "unsupported reply")
|
||
|
||
|
||
def make_password_change_request(jid: str, password: str) -> ET.Element:
|
||
username, domain, _ = split_jid(jid)
|
||
# XXX: this is due to a problem with mod_rest / mod_register in prosody:
|
||
# it doesn’t recognize the password change stanza unless we send it to
|
||
# the account JID.
|
||
req = ET.Element("iq", to="{}@{}".format(username, domain), type="set")
|
||
q = ET.SubElement(req, "query", xmlns="jabber:iq:register")
|
||
ET.SubElement(q, "username").text = username
|
||
ET.SubElement(q, "password").text = password
|
||
return req
|
||
|
||
|
||
def make_pubsub_item_put_request(
|
||
to: str, node: str,
|
||
id_: typing.Optional[str] = None,
|
||
) -> typing.Tuple[ET.Element, ET.Element]:
|
||
req = ET.Element("iq", type="set", to=to)
|
||
q = ET.SubElement(req, "pubsub", xmlns=NS_PUBSUB)
|
||
publish = ET.SubElement(q, "publish", node=node)
|
||
item = ET.SubElement(publish, "item")
|
||
if id_ is not None:
|
||
item.set("id", id_)
|
||
return req, item
|
||
|
||
|
||
def make_nickname_set_request(to: str, nickname: str) -> ET.Element:
|
||
req, item = make_pubsub_item_put_request(
|
||
to,
|
||
NODE_USER_NICKNAME,
|
||
)
|
||
ET.SubElement(item, "nick", xmlns=NS_USER_NICKNAME).text = nickname
|
||
return req
|
||
|
||
|
||
def make_pubsub_item_request(
|
||
to: str,
|
||
node: str,
|
||
id_: typing.Optional[str] = None,
|
||
) -> ET.Element:
|
||
req = ET.Element("iq", type="get", to=to)
|
||
q = ET.SubElement(req, "pubsub", xmlns=NS_PUBSUB)
|
||
items = ET.SubElement(q, "items", node=node)
|
||
if id_ is not None:
|
||
ET.SubElement(items, "item", id=id_)
|
||
else:
|
||
items.set("max_items", "1")
|
||
|
||
return req
|
||
|
||
|
||
def make_nickname_get_request(to: str) -> ET.Element:
|
||
return make_pubsub_item_request(to, NODE_USER_NICKNAME)
|
||
|
||
|
||
def make_avatar_metadata_request(to: str) -> ET.Element:
|
||
return make_pubsub_item_request(to, NODE_USER_AVATAR_METADATA)
|
||
|
||
|
||
def make_avatar_data_request(to: str, sha1: str) -> ET.Element:
|
||
return make_pubsub_item_request(to, NODE_USER_AVATAR_DATA, id_=sha1)
|
||
|
||
|
||
def make_avatar_data_set_request(
|
||
to: str,
|
||
data: bytes,
|
||
id_: str,
|
||
) -> ET.Element:
|
||
req, item = make_pubsub_item_put_request(
|
||
to,
|
||
NODE_USER_AVATAR_DATA,
|
||
id_=id_,
|
||
)
|
||
ET.SubElement(item, "data", xmlns=NS_USER_AVATAR_DATA).text = \
|
||
base64.b64encode(data).decode("ascii")
|
||
return req
|
||
|
||
|
||
def make_avatar_metadata_set_request(
|
||
to: str,
|
||
mimetype: str,
|
||
id_: str,
|
||
size: int,
|
||
width: typing.Optional[int] = None,
|
||
height: typing.Optional[int] = None,
|
||
) -> ET.Element:
|
||
req, item = make_pubsub_item_put_request(
|
||
to,
|
||
NODE_USER_AVATAR_METADATA,
|
||
id_=id_,
|
||
)
|
||
metadata_wrap = ET.SubElement(
|
||
item,
|
||
"metadata", xmlns=NS_USER_AVATAR_METADATA)
|
||
|
||
attr: typing.Dict[str, str] = {
|
||
"id": id_,
|
||
"bytes": str(size),
|
||
"type": mimetype,
|
||
}
|
||
if width is not None:
|
||
attr["width"] = str(width)
|
||
if height is not None:
|
||
attr["height"] = str(height)
|
||
|
||
ET.SubElement(
|
||
metadata_wrap,
|
||
"info",
|
||
xmlns=NS_USER_AVATAR_METADATA,
|
||
**attr, # type: ignore
|
||
)
|
||
return req
|
||
|
||
|
||
def _require_child(t: ET.Element, tag: str) -> ET.Element:
|
||
el = t.find(tag)
|
||
if el is None:
|
||
raise abort(500, "malformed reply: missing {}".format(tag))
|
||
return el
|
||
|
||
|
||
def extract_pubsub_item_get_reply(
|
||
iq_tree: ET.Element,
|
||
payload_tag: str,
|
||
) -> typing.Optional[ET.Element]:
|
||
try:
|
||
pubsub = extract_iq_reply(iq_tree, TAG_PUBSUB)
|
||
except werkzeug.exceptions.NotFound:
|
||
return None
|
||
|
||
if pubsub is None:
|
||
# no payload in IQ reply
|
||
raise abort(500, "malformed reply")
|
||
|
||
items = _require_child(pubsub, TAG_PUBSUB_ITEMS)
|
||
if len(items) == 0:
|
||
return None
|
||
|
||
return _require_child(_require_child(items, TAG_PUBSUB_ITEM), payload_tag)
|
||
|
||
|
||
def extract_nickname_get_reply(iq_tree: ET.Element) -> typing.Optional[str]:
|
||
nick = extract_pubsub_item_get_reply(iq_tree, TAG_USER_NICKNAME_NICK)
|
||
if nick is None:
|
||
return None
|
||
return nick.text
|
||
|
||
|
||
def extract_avatar_metadata_get_reply(
|
||
iq_tree: ET.Element,
|
||
) -> typing.Optional[typing.MutableMapping[str, typing.Any]]:
|
||
metadata = extract_pubsub_item_get_reply(iq_tree, TAG_USER_AVATAR_METADATA)
|
||
if metadata is None:
|
||
return None
|
||
|
||
if len(metadata) != 1 or metadata[0].tag != TAG_USER_AVATAR_METADATA_INFO:
|
||
# raise an error instead?
|
||
return None
|
||
|
||
info = metadata[0]
|
||
attrs = info.attrib
|
||
result: typing.MutableMapping[str, typing.Optional[str]] = {
|
||
"sha1": attrs["id"],
|
||
"type": attrs.get("type", "image/png"),
|
||
}
|
||
|
||
def extract_optional(
|
||
key: str,
|
||
type_: typing.Callable[[str], typing.Any] = lambda x: int(x),
|
||
) -> None:
|
||
try:
|
||
result[key] = type_(attrs[key])
|
||
except (KeyError, ValueError, TypeError):
|
||
pass
|
||
|
||
extract_optional("width")
|
||
extract_optional("height")
|
||
extract_optional("bytes")
|
||
|
||
return result
|
||
|
||
|
||
def extract_avatar_data_get_reply(
|
||
iq_tree: ET.Element,
|
||
) -> typing.Optional[bytes]:
|
||
data = extract_pubsub_item_get_reply(iq_tree, TAG_USER_AVATAR_DATA)
|
||
if data is None or data.text is None:
|
||
return None
|
||
return base64.b64decode(data.text)
|
||
|
||
|
||
def make_pubsub_node_config_put_request(
|
||
to: str, node: str,
|
||
id_: typing.Optional[str] = None,
|
||
) -> typing.Tuple[ET.Element, ET.Element]:
|
||
req = ET.Element("iq", type="set", to=to)
|
||
q = ET.SubElement(req, "pubsub", xmlns=NS_PUBSUB_OWNER)
|
||
configure = ET.SubElement(q, "configure", node=node)
|
||
form = ET.SubElement(configure, "x",
|
||
xmlns=NS_DATA_FORM,
|
||
type="submit")
|
||
form_type = ET.SubElement(form, "field", var="FORM_TYPE", type="hidden")
|
||
ET.SubElement(form_type, "value").text = FORM_NODE_CONFIG
|
||
return req, form
|
||
|
||
|
||
def make_pubsub_node_config_get_request(
|
||
to: str, node: str,
|
||
) -> ET.Element:
|
||
req = ET.Element("iq", type="get", to=to)
|
||
q = ET.SubElement(req, "pubsub", xmlns=NS_PUBSUB_OWNER)
|
||
ET.SubElement(q, "configure", node=node)
|
||
return req
|
||
|
||
|
||
def add_form_field(
|
||
form: ET.Element,
|
||
var: str,
|
||
values: typing.Union[str, typing.Collection[str]],
|
||
type_: typing.Optional[str] = None,
|
||
) -> ET.Element:
|
||
if isinstance(values, str):
|
||
values = [values]
|
||
field = ET.SubElement(form, "field", var=var)
|
||
if type_ is not None:
|
||
field.set("type", type_)
|
||
for v in values:
|
||
ET.SubElement(field, "value").text = v
|
||
return field
|
||
|
||
|
||
def make_pubsub_access_model_put_request(
|
||
to: str,
|
||
node: str,
|
||
new_access_model: str,
|
||
) -> ET.Element:
|
||
req, form = make_pubsub_node_config_put_request(to, node)
|
||
add_form_field(form, FORM_FIELD_PUBSUB_ACCESS_MODEL, new_access_model)
|
||
return req
|
||
|
||
|
||
def extract_pubsub_node_config_get_reply(
|
||
iq_tree: ET.Element,
|
||
) -> typing.Mapping[str, typing.Sequence[str]]:
|
||
payload = extract_iq_reply(iq_tree)
|
||
if payload is None:
|
||
raise ValueError("invalid reply")
|
||
form = _require_child(_require_child(payload, TAG_PUBSUB_CONFIGURE),
|
||
TAG_DATA_FORM_X)
|
||
result: typing.MutableMapping[str, typing.List[str]] = {}
|
||
for child in form.findall(TAG_DATA_FORM_FIELD):
|
||
var = child.get("var")
|
||
if var is None:
|
||
continue
|
||
values = [value_tag.text or ""
|
||
for value_tag in child.findall(TAG_DATA_FORM_VALUE)]
|
||
result[var] = values
|
||
|
||
return result
|