Poezio Journal
Publishing Over XMPP.
About
Poezio Journal is a plugin for the software Poezio.
It allows to interact with people over Blasta, Libervia and Movim.
XEPs
Features
- Reading;
- Publishing; and
- Commenting.
Tasks
- Handle URI to automatically prompt command "browse";
- Notify of new items;
- Improve integration with the interface of Poezio, including skiming through Node names and Item IDs.
- Posting comments;
- Parsing reStructuredText (RST) to HTML;
- Register Poezio and command "browse" as handlers of URI Query;
- Error handling;
- Logging;
- Support Markdown; and
- Support HTML.
License
GPL Version 3.
Code
"""
Support Atom Over XMPP and interact with people over Blasta, Libervia and Movim.
XEP-0472: Pubsub Social Feed
XEP-0277: Journaling over XMPP
XEP-0060: Publish-Subscribe
Usage
-----
.. glossary::
/publish
**Usage:** ``/publish ``
This command publishes PubSub posts.
/browse
**Usage:** ``/browse ``
This command reads PubSub posts.
/comment
**Usage:** ``/comment ``
This command posts comments on PubSub posts.
"""
from datetime import datetime
import os
from poezio.plugin import BasePlugin
from slixmpp.exceptions import IqError, IqTimeout
from slixmpp.jid import InvalidJID
from slixmpp.stanza.iq import Iq
import subprocess
import tempfile
from urllib.parse import urlparse
import xml.etree.ElementTree as ET
CONTACT_TYPES = ['abuse', 'admin', 'feedback', 'sales', 'security', 'support']
class Plugin(BasePlugin):
def init(self):
self.api.add_command('publish', self.command_publish,
usage='',
short='Publish text to your PEP journal',
help='Publish text to your PEP journal. Item ID be set by MD5 checksum of link or randomly, if not specified')
self.api.add_command('browse', self.command_browse,
usage='',
short='Read posts of PEP journal of a given JID',
help='Read posts of PEP journal of a given JID. Item ID must be specified')
# self.api.add_command('comment', self.command_comment,
# usage='',
# short='Send a comment to a given post',
# help='Send a comment to a given post. Item ID must be specified')
def on_disco(self, iq):
info = iq['disco_info']
contacts = []
# iterate all data forms, in case there are multiple
for form in iq['disco_info']:
values = form.get_values()
if values['FORM_TYPE'][0] == 'http://jabber.org/network/serverinfo':
for var in values:
if not var.endswith('-addresses'):
continue
title = var[:-10] # strip '-addresses'
sep = '\n ' + len(title) * ' '
field_value = values[var]
if field_value:
value = sep.join(field_value) if isinstance(field_value, list) else field_value
contacts.append(f'{title}: {value}')
if contacts:
self.api.information('\n'.join(contacts), 'Contact Info')
else:
self.api.information(f'No Contact Addresses for {iq["from"]}', 'Error')
async def command_disco(self, jid):
try:
iq = await self.core.xmpp.plugin['xep_0030'].get_info(jid=jid, cached=False)
self.on_disco(iq)
except InvalidJID as exn:
self.api.information(f'Invalid JID “{jid}”: {exn}', 'Error')
except (IqError, IqTimeout,) as exn:
ifrom = exn.iq['from']
condition = exn.iq['error']['condition']
text = exn.iq['error']['text']
message = f'Error getting Contact Addresses from {ifrom}: {condition}: {text}'
self.api.information(message, 'Error')
async def command_publish(self, uri=None):
if uri:
pass
else:
jid_bare = self.core.xmpp.boundjid.bare
temp_file_content = reStructuredText.create_a_new_template(jid_bare)
editor = Utilities.get_system_editor() or 'vim'
fd, tmpfile = tempfile.mkstemp(prefix='poezio-edit-')
os.close(fd)
with open(tmpfile, 'w+', encoding='utf-8') as fp:
fp.write(temp_file_content)
fp.flush()
cmd = editor.split(' ')
cmd += (tmpfile,)
subprocess.call(cmd)
with open(tmpfile, 'r', encoding='utf-8') as f:
content = f.read()
os.remove(tmpfile)
parsed_content = reStructuredText.parse_temp_file_content(content)
title = parsed_content['title'] or 'Untitled'
desc = parsed_content['description']
date = parsed_content['date']
link = parsed_content['link']
tags = parsed_content['tags'].split(',')
item_id = parsed_content['slug']
node = parsed_content['category'] or 'urn:xmpp:microblog:0'
jid_bare = parsed_content['jid'] or self.core.xmpp.boundjid.bare
text_type = parsed_content['type'] or 'text'
text = parsed_content['text']
entry_new = {
'title' : title,
'link' : link,
'summary' : desc,
'content' : text,
'published' : date,
'updated' : UtilitiesDate.get_current_time_as_iso8601(),
'tags' : tags,
'type' : text_type,
'id' : item_id}
payload = XmlAtom.create_rfc4287_entry(entry_new)
await XmppPubSub.publish_node_item(
self, jid_bare, node, item_id, payload)
uri = f'xmpp:{jid_bare}?pubsub;node={node};item={item_id}'
await self.command_browse(uri)
async def command_browse(self, uri):
result = None
pubsub, node, item_id = Utilities.parse_uri_query(uri)
if not pubsub:
self.api.information(f'URI {uri} has no Jabber ID.', 'Error')
return
if not node:
self.api.information(f'URI {uri} has no Node Name.', 'Error')
return
if item_id:
iq = await XmppPubSub.get_node_item(self, pubsub, node, item_id)
if iq:
link = XmppUtilities.form_an_item_link(pubsub, node, item_id)
atom = XmlAtom.extract_atom(iq)
result = XmlAtom.generate_atom_post(atom, pubsub, node, link)
else:
result = (f'Please ensure that PubSub Node Name "{node}" and '
f'Item ID "{item_id}" are valid and accessible.')
else:
iq = await XmppPubSub.get_node_items(self, pubsub, node)
if iq:
link = XmppUtilities.form_a_node_link(pubsub, node)
atom = XmlAtom.extract_atom(iq)
result = XmlAtom.generate_atom_post(atom, pubsub, node, link)
else:
result = (f'Please ensure that PubSub Node Name "{node}" is '
'valid and accessible.')
if result:
self.api.information(result, 'Journal')
else:
self.api.information(f'No Journal found for {iq["from"]}', 'Error')
class XmppPubSub:
async def publish_node_item(self, jid_bare, node, item_id, payload):
try:
iq = await self.core.xmpp.plugin['xep_0060'].publish(
jid_bare, node, id=item_id, payload=payload)
print(iq)
return iq
except (IqError, IqTimeout) as e:
print(e)
def create_node(self, jid_bare, node, title=None, subtitle=None):
#jid_from = self.boundjid.bare if self.is_component else None
if not title: title = 'My XMPP Journal'
if not subtitle: subtitle = 'A Poezio Journal Over XMPP'
iq = self.Iq(stype='set',
sto=jid_bare)
#sfrom=jid_from)
iq['pubsub']['create']['node'] = node
form = iq['pubsub']['configure']['form']
form['type'] = 'submit'
form.addField('pubsub#title',
ftype='text-single',
value=title)
form.addField('pubsub#description',
ftype='text-single',
value=subtitle)
form.addField('pubsub#notify_retract',
ftype='boolean',
value=1)
form.addField('pubsub#max_items',
ftype='text-single',
value='50')
form.addField('pubsub#persist_items',
ftype='boolean',
value=1)
form.addField('pubsub#send_last_published_item',
ftype='text-single',
value='never')
form.addField('pubsub#deliver_payloads',
ftype='boolean',
value=0)
form.addField('pubsub#type',
ftype='text-single',
value='http://www.w3.org/2005/Atom')
return iq
async def get_node_item(self, pubsub, node, item_id):
try:
iq = await self.core.xmpp.plugin['xep_0060'].get_item(
pubsub, node, item_id, timeout=5)
return iq
except InvalidJID as exn:
self.api.information(f'Invalid JID “{jid}”: {exn}', 'Error')
except (IqError, IqTimeout,) as exn:
ifrom = exn.iq['from']
condition = exn.iq['error']['condition']
text = exn.iq['error']['text']
message = f'Error receiving node item {item_id} from {ifrom}: {condition}: {text}'
self.api.information(message, 'Error')
async def get_node_items(self, pubsub, node):
try:
iq = await self.core.xmpp.plugin['xep_0060'].get_items(
pubsub, node, timeout=5)
return iq
except InvalidJID as exn:
self.api.information(f'Invalid JID “{jid}”: {exn}', 'Error')
except (IqError, IqTimeout,) as exn:
ifrom = exn.iq['from']
condition = exn.iq['error']['condition']
text = exn.iq['error']['text']
message = f'Error receiving node items from {ifrom}: {condition}: {text}'
self.api.information(message, 'Error')
class XmppUtilities:
def form_a_node_link(pubsub, node):
link = 'xmpp:{pubsub}?;node={node}'.format(pubsub=pubsub, node=node)
return link
def form_an_item_link(pubsub, node, item_id):
link = 'xmpp:{pubsub}?;node={node};item={item}'.format(
pubsub=pubsub, node=node, item=item_id)
return link
class XmlAtom:
def extract_atom(iq: Iq):
"""Extract data from an Atom Syndication Format (RFC 4287) of a Publish-Subscribe (XEP-0060) node item."""
jid = iq['from'].bare
node = iq['pubsub']['items']['node']
atom = {}
atom['title'] = jid
atom['subtitle'] = node
atom['language'] = iq['pubsub']['items']['lang']
atom['items'] = []
items = iq['pubsub']['items']
for item in list(items)[::-1]:
atom_item = {}
item_payload = item['payload']
namespace = '{http://www.w3.org/2005/Atom}'
title = item_payload.find(namespace + 'title')
links = item_payload.find(namespace + 'link')
if (not isinstance(title, ET.Element) and
not isinstance(links, ET.Element)): continue
title_text = 'No title' if title == None else title.text
atom_item['title'] = title_text
if isinstance(links, ET.Element):
atom_item['links'] = []
for link in item_payload.findall(namespace + 'link'):
link_href = link.attrib['href'] if 'href' in link.attrib else ''
link_type = link.attrib['type'] if 'type' in link.attrib else ''
link_rel = link.attrib['rel'] if 'rel' in link.attrib else ''
atom_item['links'].append({'href': link_href,
'rel': link_rel,
'type': link_type})
contents = item_payload.find(namespace + 'content')
atom_item['contents'] = []
if isinstance(contents, ET.Element):
for content in item_payload.findall(namespace + 'content'):
if not content.text: continue
content_text = content.text
content_type = content.attrib['type'] if 'type' in content.attrib else 'html'
content_type_text = 'html' if 'html' in content_type else 'text'
atom_item['contents'].append({'text' : content_text,
'type' : content_type_text})
else:
summary = item_payload.find(namespace + 'summary')
summary_text = summary.text if summary else None
if summary_text:
summary_type = summary.attrib['type'] if 'type' in summary.attrib else 'html'
summary_type_text = 'html' if 'html' in summary_type else 'text'
atom_item['contents'].append(summary_text)
# else:
# atom_item['contents'].append('No content.')
published = item_payload.find(namespace + 'published')
published_text = '' if published == None else published.text
atom_item['published'] = published_text
updated = item_payload.find(namespace + 'updated')
updated_text = '' if updated == None else updated.text
atom_item['updated'] = updated_text
atom_item['authors'] = []
authors = item_payload.find(namespace + 'author')
if isinstance(authors, ET.Element):
for author in item_payload.findall(namespace + 'author'):
atom_item_author = {}
author_email = author.find(namespace + 'email')
if author_email is not None:
author_email_text = author_email.text
if author_email_text:
atom_item_author['email'] = author_email_text
else:
author_email_text = None
author_uri = author.find(namespace + 'uri')
if author_uri is not None:
author_uri_text = author_uri.text
if author_uri_text:
atom_item_author['uri'] = author_uri_text
else:
author_uri_text = None
author_name = author.find(namespace + 'name')
if author_name is not None and author_name.text:
author_name_text = author_name.text
else:
author_name_text = author_uri_text or author_email_text
atom_item_author['name'] = author_name_text
atom_item['authors'].append(atom_item_author)
categories = item_payload.find(namespace + 'category')
atom_item['categories'] = []
if isinstance(categories, ET.Element):
for category in item_payload.findall(namespace + 'category'):
if 'term' in category.attrib and category.attrib['term']:
category_term = category.attrib['term']
atom_item['categories'].append(category_term)
identifier = item_payload.find(namespace + 'id')
if identifier is not None and identifier.attrib: print(identifier.attrib)
identifier_text = item['id'] if identifier == None else identifier.text
atom_item['id'] = identifier_text
#atom_item['id'] = item['id']
atom['items'].append(atom_item)
return atom
def generate_atom_post(atom: dict, pubsub: str, node: str, link: str):
"""Generate a textual article from a Publish-Subscribe (XEP-0060) node items."""
# link = XmppUtilities.form_a_node_link(pubsub, node)
# subtitle = 'XMPP PubSub Syndication Feed'
article = ''
article += f"JID: {atom['title']}\n"
article += f"Node: {atom['subtitle']}\n"
article += f"Link: {link}\n\n"
for item in atom['items']:
# Title
article += f"Title: {item['title']}\n"
#article += f"Subtitle: {item['subtitle']}\n"
# Published
published = UtilitiesDate.convert_iso8601_to_readable(item['updated'])
article += f"Published: {published}\n"
# Updated
updated = UtilitiesDate.convert_iso8601_to_readable(item['updated'])
article += f"Updated: {updated}\n"
# Author
authors = item['authors'] if 'authors' in item else None
if authors:
article += "Authors:\n"
for author in authors:
article += f"{author['name'] or author['uri'] or author['email']}\n"
if 'email' in author and author['email']:
article += f"mailto:{author['email']}\n"
if 'uri' in author and author['uri']:
article += f"{author['uri']}\n"
# Summary
summaries = item['summary'] if 'summary' in item else None
article += "Summary:\n"
if summaries:
for summary in summaries:
article += f"{summary['text']}\n"
else:
article += "No summary.\n"
# Content
contents = item['contents'] if 'contents' in item else None
article += "Content:\n"
if contents:
for content in contents:
article += f"{content['text']}\n"
# TODO Check type and parse accordingly content['type']
else:
article += "No content.\n"
# Categories
categories = item['categories']
if categories:
article += "Categories:\n"
for category in categories:
article += f"{category}; "
article += f"\n"
# Links
article += f"Links:\n"
links = item['links'] if 'links' in item else None
if links:
for link in links:
article += f"{link['href']}\n"
link_xmpp = XmppUtilities.form_an_item_link(pubsub, node, item['id'])
article += f"{link_xmpp}\n"
# ID
article += f"ID: {item['id']}\n"
article += "\n"
return article
def create_rfc4287_entry(feed_entry):
node_entry = ET.Element('entry')
node_entry.set('xmlns', 'http://www.w3.org/2005/Atom')
# Title
title = ET.SubElement(node_entry, 'title')
title.set('type', 'text')
title.text = feed_entry['title']
# Summary
summary = ET.SubElement(node_entry, 'summary')
summary.set('type', 'text')
#summary.set('lang', feed_entry['summary_lang'])
summary.text = feed_entry['summary']
# Content
content = ET.SubElement(node_entry, 'content')
content.set('type', feed_entry['type'])
#content.set('lang', feed_entry['content_lang'])
content.text = feed_entry['content']
# Tags
if feed_entry['tags']:
for term in feed_entry['tags']:
tag = ET.SubElement(node_entry, 'category')
tag.set('term', term)
# Link
link = ET.SubElement(node_entry, "link")
link.set('href', feed_entry['link'])
# Links
# for feed_entry_link in feed_entry['links']:
# link = ET.SubElement(node_entry, "link")
# link.set('href', feed_entry_link['url'])
# link.set('type', feed_entry_link['type'])
# link.set('rel', feed_entry_link['rel'])
# Date saved
if 'published' in feed_entry and feed_entry['published']:
published = ET.SubElement(node_entry, 'published')
published.text = feed_entry['published']
# Date edited
if 'updated' in feed_entry and feed_entry['updated']:
updated = ET.SubElement(node_entry, 'updated')
updated.text = feed_entry['updated']
return node_entry
class Utilities:
def get_system_editor():
"""Returns default system editor is $EDITOR is set."""
return os.environ.get('EDITOR', 'none')
def parse_uri_query(uri):
"""Extract Jabber ID, Node Name and Item ID from a given URI."""
pubsub = node = item_id = None
parsed_uri = urlparse(uri)
pubsub = parsed_uri.path
parsed_uri_query = parsed_uri.query
for query_item in parsed_uri_query.split(';'):
if query_item:
if query_item.startswith('node'):
node = query_item[5:]
if query_item.startswith('item'):
item_id = query_item[5:]
return pubsub, node, item_id
class reStructuredText:
def create_a_new_template(jid_bare):
return (''
'.. title: \n'
'.. slug: \n'
'.. date: \n'
'.. tags: \n'
'.. category: urn:xmpp:microblog:0\n'
'.. link: \n'
'.. description: \n'
'.. jid: {}\n'
'.. type: \n\n').format(jid_bare)
def parse_temp_file_content(content):
title = slug = date = tags = category = \
link = description = jid = type = ''
kvd = {
'title' : title,
'slug' : slug,
'date' : date,
'tags' : tags,
'category' : category,
'link' : link,
'description' : description,
'jid' : jid,
'type' : type}
lines = content.strip().split('\n')
for line in lines:
if lines.index(line) > 9: break
for i in kvd:
if line.startswith(f'.. {i}:'):
kvd[i] = line[len(f'.. {i}:'):].strip()
content_lines = lines[9:] # TODO
kvd['text'] = '\n'.join(content_lines).strip()
return kvd
class UtilitiesDate:
def get_current_time_as_iso8601():
return datetime.now().isoformat()
def convert_iso8601_to_readable(timestamp):
old_date_format = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
new_date_format = old_date_format.strftime("%B %d, %Y")
return new_date_format