You've already forked snikket-web-portal
@@ -19,9 +19,12 @@ ERROR_CODE_MAP = {
|
||||
}
|
||||
|
||||
NS_PUBSUB = "http://jabber.org/protocol/pubsub"
|
||||
NS_PUBSUB_OWNER = "http://jabber.org/protocol/pubsub#owner"
|
||||
TAG_PUBSUB = "{{{}}}pubsub".format(NS_PUBSUB)
|
||||
TAG_PUBSUB_OWNER = "{{{}}}pubsub".format(NS_PUBSUB_OWNER)
|
||||
TAG_PUBSUB_ITEM = "{{{}}}item".format(NS_PUBSUB)
|
||||
TAG_PUBSUB_ITEMS = "{{{}}}items".format(NS_PUBSUB)
|
||||
TAG_PUBSUB_CONFIGURE = "{{{}}}configure".format(NS_PUBSUB_OWNER)
|
||||
|
||||
NS_USER_NICKNAME = "http://jabber.org/protocol/nick"
|
||||
NODE_USER_NICKNAME = NS_USER_NICKNAME
|
||||
@@ -36,6 +39,16 @@ NODE_USER_AVATAR_DATA = "urn:xmpp:avatar:data"
|
||||
NS_USER_AVATAR_DATA = "urn:xmpp:avatar:data"
|
||||
TAG_USER_AVATAR_DATA = "{{{}}}data".format(NS_USER_AVATAR_DATA)
|
||||
|
||||
NODE_VCARD = "urn:xmpp:vcard4"
|
||||
|
||||
NS_DATA_FORM = "jabber:x:data"
|
||||
TAG_DATA_FORM_X = "{{{}}}x".format(NS_DATA_FORM)
|
||||
TAG_DATA_FORM_FIELD = "{{{}}}field".format(NS_DATA_FORM)
|
||||
TAG_DATA_FORM_VALUE = "{{{}}}value".format(NS_DATA_FORM)
|
||||
|
||||
FORM_NODE_CONFIG = "http://jabber.org/protocol/pubsub#node_config"
|
||||
FORM_FIELD_PUBSUB_ACCESS_MODEL = "pubsub#access_model"
|
||||
|
||||
|
||||
SimpleJID = typing.Tuple[typing.Optional[str], str, typing.Optional[str]]
|
||||
T = typing.TypeVar("T")
|
||||
@@ -211,7 +224,7 @@ def make_avatar_metadata_set_request(
|
||||
def _require_child(t: ET.Element, tag: str) -> ET.Element:
|
||||
el = t.find(tag)
|
||||
if el is None:
|
||||
raise abort(500, "malformed reply")
|
||||
raise abort(500, "malformed reply: missing {}".format(tag))
|
||||
return el
|
||||
|
||||
|
||||
@@ -283,3 +296,73 @@ def extract_avatar_data_get_reply(
|
||||
if data is None or data.text is None:
|
||||
return None
|
||||
return base64.b64decode(data.text)
|
||||
|
||||
|
||||
def make_pubsub_node_config_put_request(
|
||||
to: str, node: str,
|
||||
id_: typing.Optional[str] = None,
|
||||
) -> typing.Tuple[ET.Element, ET.Element]:
|
||||
req = ET.Element("iq", type="set", to=to)
|
||||
q = ET.SubElement(req, "pubsub", xmlns=NS_PUBSUB_OWNER)
|
||||
configure = ET.SubElement(q, "configure", node=node)
|
||||
form = ET.SubElement(configure, "x",
|
||||
xmlns=NS_DATA_FORM,
|
||||
type="submit")
|
||||
form_type = ET.SubElement(form, "field", var="FORM_TYPE", type="hidden")
|
||||
ET.SubElement(form_type, "value").text = FORM_NODE_CONFIG
|
||||
return req, form
|
||||
|
||||
|
||||
def make_pubsub_node_config_get_request(
|
||||
to: str, node: str,
|
||||
) -> ET.Element:
|
||||
req = ET.Element("iq", type="get", to=to)
|
||||
q = ET.SubElement(req, "pubsub", xmlns=NS_PUBSUB_OWNER)
|
||||
ET.SubElement(q, "configure", node=node)
|
||||
return req
|
||||
|
||||
|
||||
def add_form_field(
|
||||
form: ET.Element,
|
||||
var: str,
|
||||
values: typing.Union[str, typing.Collection[str]],
|
||||
type_: typing.Optional[str] = None,
|
||||
) -> ET.Element:
|
||||
if isinstance(values, str):
|
||||
values = [values]
|
||||
field = ET.SubElement(form, "field", var=var)
|
||||
if type_ is not None:
|
||||
field.set("type", type_)
|
||||
for v in values:
|
||||
ET.SubElement(field, "value").text = v
|
||||
return field
|
||||
|
||||
|
||||
def make_pubsub_access_model_put_request(
|
||||
to: str,
|
||||
node: str,
|
||||
new_access_model: str,
|
||||
) -> ET.Element:
|
||||
req, form = make_pubsub_node_config_put_request(to, node)
|
||||
add_form_field(form, FORM_FIELD_PUBSUB_ACCESS_MODEL, new_access_model)
|
||||
return req
|
||||
|
||||
|
||||
def extract_pubsub_node_config_get_reply(
|
||||
iq_tree: ET.Element,
|
||||
) -> typing.Mapping[str, typing.Sequence[str]]:
|
||||
payload = extract_iq_reply(iq_tree)
|
||||
if payload is None:
|
||||
raise ValueError("invalid reply")
|
||||
form = _require_child(_require_child(payload, TAG_PUBSUB_CONFIGURE),
|
||||
TAG_DATA_FORM_X)
|
||||
result: typing.MutableMapping[str, typing.List[str]] = {}
|
||||
for child in form.findall(TAG_DATA_FORM_FIELD):
|
||||
var = child.get("var")
|
||||
if var is None:
|
||||
continue
|
||||
values = [value_tag.text or ""
|
||||
for value_tag in child.findall(TAG_DATA_FORM_VALUE)]
|
||||
result[var] = values
|
||||
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user