import datetime import io import string import time import urllib.parse as urlparse import zipfile from model import Search, Segment, User, Subspace, Post from utils import * def make_subspace(session): if not session.user: return 60, "Login required" if session.user.role == User.LIMITED: return 61, "Not authorized" if not session.bubble.user_subspaces and session.user.role == User.BASIC: return 61, "Not authorized" db = session.db req = session.req if is_empty_query(req): return 10, "Enter name for new subspace: " + session.NAME_HINT name = clean_query(req) if not is_valid_name(name): return 10, "That is an invalid subspace name. " + session.NAME_HINT try: db.create_subspace(name, session.user.id) return 30, f'/s/{name}/admin' except: return 10, "That subspace already exists. " + session.NAME_HINT def make_subspaces_page(session): user = session.user req = session.req db = session.db LABELS = { 'active': 'Activity', 'alpha': 'Summary', 'chrono': 'Chronological Summary', 'name': 'Index' } LOCK = ' 🔒' ACTIVE_AGE = 60 * 24 * 3600 ARCHIVAL_MONTHS = 12 ARCHIVAL_AGE = ARCHIVAL_MONTHS * 30 * 24 * 3600 arg = clean_query(req) if arg == 'mode': page = "Choose view mode for the subspace index:\n\n" for key, label in LABELS.items(): page += f"=> /s/?{key} {label}\n" return page page = '# Subspaces\n\n' if user: page += session.dashboard_link() if user.role != User.LIMITED and ( session.bubble.user_subspaces or user.role == User.ADMIN): page += f'=> /new-subspace 🌒 New subspace\n' page += '=> ?mode Change view mode\n' page += '=> / Back to front page\n\n' # View mode is determined by the query string. if arg in LABELS: view_mode = arg else: view_mode = 'name' subs = db.get_subspaces(owner=0) locked_subs = db.get_subspaces(locked=True) locked_ids = set([sub.id for sub in locked_subs]) def sub_link(sub): lock_icon = LOCK if sub.id in locked_ids else '' return f'=> /{sub.title()} {sub.title()}{lock_icon}\n' def sub_stats(sub): stats = [] if sub.num_posts: kind = 'issue' if sub.flags & Subspace.ISSUE_TRACKER else 'post' stats.append(f"{sub.num_posts} {kind}{plural_s(sub.num_posts)}") if sub.num_cmts: stats.append(f"{sub.num_cmts} comment{plural_s(sub.num_cmts)}") if sub.num_people > 1: stats.append(f"{sub.num_people} people") return ' ¡ '.join(stats) + "\n" def sub_latest_post(sub): latest = db.get_post(id=sub.latest_post_id) if sub.latest_post_id else None if latest: title = latest.quoted_title() age = latest.age(tz=session.tz) return f"{title} by {latest.poster_avatar} {latest.poster_name} ¡ {age}\n" return '' def sub_heading(sub): i = sub.name[0].upper() if i in string.digits or i in string.punctuation: return '0-9' if i in string.ascii_uppercase: return i return 'Other' if len(subs) == 0: page += 'No subspaces.\n' elif view_mode == 'name': use_headings = len(subs) >= 50 last_heading = None page += f'## {LABELS[view_mode]}\n' if not use_headings: page += '\n' for sub in subs: if use_headings: heading = sub_heading(sub) if heading != last_heading: last_heading = heading page += f'\n### {heading}\n' page += sub_link(sub) elif view_mode == 'alpha' or view_mode == 'chrono': # Basic alphabetical index. page += f'## {LABELS[view_mode]}\n' if view_mode == 'chrono': page += '\nSubspaces with recent activity are listed first.\n' subs = sorted(subs, key=lambda s: s.ts_active, reverse=True) for sub in subs: page += '\n' + sub_link(sub) if sub.num_posts > 0: kind = 'issue' if sub.flags & Subspace.ISSUE_TRACKER else 'post' page += f"{sub.num_posts} {kind}{plural_s(sub.num_posts)} ¡ {ago_text(sub.ts_active, tz=session.tz)}\n" else: page += 'Empty\n' else: # Divide into Active, Dormant, Empty, Pending Archival. active = [] dormant = [] empty = [] pending = [] now_time = time.time() for sub in sorted(subs, key=lambda s: s.ts_active, reverse=True): since_active = now_time - sub.ts_active if since_active > ARCHIVAL_AGE: pending.append(sub) elif not sub.latest_post_id: empty.append(sub) elif since_active > ACTIVE_AGE: dormant.append(sub) else: active.append(sub) if len(active): page += f'## {len(active)} Active\n' for sub in active: page += '\n' + sub_link(sub) page += sub_stats(sub) page += sub_latest_post(sub) if len(dormant): page += f'\n## {len(dormant)} Inactive 😴\n' for sub in dormant: page += '\n' + sub_link(sub) page += sub_stats(sub) page += sub_latest_post(sub) if len(empty): page += f'\n## {len(empty)} Empty\n\n' for sub in empty: page += sub_link(sub) if len(pending): page += f'\n## {len(pending)} Pending Archival đŸĒĻ\n' page += f'No activity in the last {ARCHIVAL_MONTHS} months:\n' for sub in pending: page += '\n' + sub_link(sub) page += sub_stats(sub) return page def subspace_admin_actions(session, action): if not session.is_user_mod: return 61, "Moderator rights required" user = session.user req = session.req db = session.db subspace = session.context admin_link = f'/{subspace.title()}/admin' is_subspace_deletable = subspace.owner == 0 and (user.role == User.ADMIN or db.is_empty_subspace(subspace)) page = f'# {subspace.title()}: Administration\n' if user.role == User.ADMIN: m = re.search(r'/repo/(new|delete|clone-url|view-url|idlabel)?(/([0-9a-zA-Z]{10}))?$', req.path) if m: if m[1] == 'new': db.create_repository(subspace) return 30, f'{admin_link}/repo/' elif m[1] == 'delete' and m[3]: if not db.verify_token(user, m[3]): return 61, 'Not authorized' if is_empty_query(req): return 10, 'Really delete repository and commit history? (DELETE to confirm)' if req.query == 'DELETE': db.destroy_repository(db.get_repository(subspace=subspace)) return 30, f'{admin_link}/repo/' elif m[1] == 'clone-url': if req.query is None: return 10, 'HTTPS URL where to clone repository:' db.update_repository(db.get_repository(subspace=subspace), clone_url=clean_query(req)) return 30, f'{admin_link}/repo/' elif m[1] == 'view-url': if req.query is None: return 10, 'Gemini URL for viewing commits:' db.update_repository(db.get_repository(subspace=subspace), view_url=clean_query(req)) return 30, f'{admin_link}/repo/' elif m[1] == 'idlabel': if req.query is None: return 10, 'Label for marking issue IDs in commit messages: (For example, "IssueID")' db.update_repository(db.get_repository(subspace=subspace), idlabel=clean_query(req)) return 30, f'{admin_link}/repo/' page += f'=> {admin_link} Go back\n' page += '\n## Git Repository\n' repo = db.get_repository(subspace=subspace) if not repo: page += '=> {admin_link}/repo/new đŸ›ĸī¸ New repository\n' return page page += f'\n=> {admin_link}/repo/clone-url Clone HTTPS URL: {repo.clone_url if repo.clone_url else "(not set)"}\n' page += f'=> {admin_link}/repo/idlabel Issue ID label: {repo.idlabel if repo.idlabel else ""}\n' page += f'\n=> {admin_link}/repo/view-url Commit view Gemini URL: {repo.view_url if repo.view_url else "(not set)"}\n' # Status information. page += '\n### Status\n' if repo.ts_fetch is None: if not repo.clone_url: page += 'Not configured.\n' else: page += 'Repository will be fetched soon.\n' else: n = db.count_commits(repo) page += f'{n} commits in history. Repository was last fetched on {datetime.datetime.fromtimestamp(repo.ts_fetch, UTC).strftime("%Y-%m-%d at %H:%M:%S %Z")}.\n' page += f'\n=> {admin_link}/repo/delete/{db.get_token(user)} ❌ Delete repository\n' return page if action == 'info': if req.query == None: return 10, f"Description for {session.context.title()}:" db.update_subspace(session.context, info=clean_description(clean_query(req)), actor_id=user.id) return 30, admin_link if action == 'url': if req.query == None: return 10, f"Featured link for {session.context.title()}: (URL and label)" try: link = form_link(parse_link_segment_query(req)) except: link = '' db.update_subspace(session.context, url=link, actor_id=user.id) return 30, admin_link if action == 'omit-all': if session.context.flags & Subspace.HIDE_OMIT_SETTING_FLAG: return 61, 'Not authorized' db.update_subspace(session.context, flags=session.context.flags ^ Subspace.OMIT_FROM_ALL_FLAG) return 30, admin_link # Actions that require link verification. m = re.search(r'/(delete|tracker|rename|add-mod|remove-mod)/([0-9a-zA-Z]{10})$', req.path) if m: token = m[2] if not db.verify_token(session.user, token): return 61, 'Not authorized' if m[1] == 'add-mod': if is_empty_query(req): return 10, 'Enter user to add as moderator:' adding = db.get_user(name=clean_query(req)) if not adding: return 51, 'Not found' db.modify_mods(session.context, actor=session.user, add=adding) return 30, admin_link if m[1] == 'remove-mod': if is_empty_query(req): return 10, 'Enter user to remove as moderator:' removing = db.get_user(name=clean_query(req)) if not removing: return 51, 'Not found' db.modify_mods(session.context, actor=session.user, remove=removing) return 30, admin_link if m[1] == 'rename': prompt = f'Enter new name for {session.context.name}? (Warning: Links to subspace will break!)' if is_empty_query(req): return 10, prompt new_name = clean_query(req) if not is_valid_name(new_name): return 10, prompt try: db.update_subspace(session.context, name=new_name, actor_id=user.id) except: return 10, prompt return 30, f'/s/{new_name}/admin' if m[1] == 'delete' and is_subspace_deletable: if is_empty_query(req): return 10, f'Really delete {session.context.title()}? (Enter DELETE to confirm.)' if req.query == 'DELETE': db.destroy_subspace(subspace) return 30, '/dashboard' return 30, admin_link elif m[1] == 'tracker' and is_subspace_deletable: new_flags = session.context.flags ^ Subspace.ISSUE_TRACKER if new_flags & Subspace.ISSUE_TRACKER: # Issues shouldn't be listed in All Posts. new_flags = new_flags | Subspace.OMIT_FROM_ALL_FLAG db.update_subspace(session.context, flags=new_flags) return 30, admin_link page += session.context.subspace_link() if not session.context.flags & (Subspace.ISSUE_TRACKER | Subspace.HIDE_OMIT_SETTING_FLAG): page += f'\n=> {admin_link}/omit-all {session.CHECKS[session.context.flags & Subspace.OMIT_FROM_ALL_FLAG]} Omit from All Posts\n' page += '\n## About\n' page += '\n### Description\n' page += (session.context.info if session.context.info else '(no description)') + '\n' page += f'=> {admin_link}/info âœī¸ Edit\n' page += '\n### Featured Link\n' page += (f'=> {session.context.url}' if session.context.url else '(no featured link)') + '\n' page += f'=> {admin_link}/url âœī¸ Edit\n' page += '\n## Moderators\n\n' mods = db.get_mods(session.context) for mod in mods: page += f'=> /u/{mod.name} {mod.avatar} {mod.name}\n' page += f'=> {admin_link}/add-mod/{session.get_token()} Add moderator\n' if len(mods) > (0 if user.role == User.ADMIN else 1): page += f'=> {admin_link}/remove-mod/{session.get_token()} Remove moderator\n' if is_subspace_deletable: page += '\n## Issue Tracking\n' page += f'\n=> {admin_link}/tracker/{db.get_token(session.user)} {session.CHECKS[nonzero(session.context.flags & Subspace.ISSUE_TRACKER)]} Subspace is an issue tracker\n' page += 'Posts in an issue tracker are designated issue IDs and have an Open/Closed status. Issues may refer to Git repository commits via hash, and commit messages can refer to issues by ID. Non-issue posts are not allowed in an issue tracker subspace.\n' if session.user.role == User.ADMIN: page += f'\n=> {admin_link}/repo/ âš™ī¸ Git repository settings\n' page += '\n## Actions\n' page += '\n=> /export/' + session.context.title() + '.gpub 📤 Export data archive\n' page += f'Download a ZIP archive containing all posts and comments in {session.context.title()}. The archive has Gempub metadata so it can also be viewed in a Gempub reader.\n' page += f'\n=> {admin_link}/rename/{session.get_token()} Rename subspace\n' page += 'Links pointing to the subspace will break when the name is changed.\n' if is_subspace_deletable: page += f'\n=> {admin_link}/delete/{session.get_token()} âš ī¸ Delete subspace {session.context.title()}\n' if not db.is_empty_subspace(subspace): page += 'All posts and comments in the subspace will be deleted. Exporting a backup beforehand is recommended.\n' else: page += 'There are no posts in the subspace.\n' return page def split_terms(text): import shlex return list(filter(lambda t: len(t) >= 2, map(str.strip, shlex.split(text.replace("'", "\\'"))))) def make_search_page(session): req = session.req db = session.db user = session.user LIMIT = 30 m = re.match(r'(/([us])/([\w%-]+))?/search(/(\d+))?', req.path) if not m: return 59, 'Bad request' if m[2] or m[3]: ident = urlparse.unquote(m[3]) scope = db.get_subspace(name=ident) if m[2] == 'u' and not scope.owner: return 51, 'Not found' if not scope: return 51, 'Not found' else: scope = None page_index = max(0, int(m[5])) if m[5] else 0 if req.query is None: return 10, f'Search {"in " + scope.title() if scope else session.bubble.site_name}:' search_url = ('/' if not scope else f'/{scope.title()}/') + 'search' terms = split_terms(clean_query(req)) if scope: page = f'# Search in {scope.title()}\n' else: page = '# Search\n' page += f'=> {search_url} 🔍 New search\n' if scope: page += f'=> /{scope.title()} Back to {scope.title()}\n' else: page += '=> / 🌒 Back to front page\n' if terms: page += '\n## ' + ' '.join(terms) + '\n' # Perform the search. search = Search(db) count = search.run(terms, scope, limit=LIMIT, page_index=page_index) # TODO: Just counting the matches without returning anything might be # a useful addition in `model.Search`. #page += f'Found {count} match{plural_s(count, "es")}.\n' if page_index > 0: page += f'\n=> {search_url}/{page_index - 1}?{req.query} Previous page\n' if count == 0: page += 'Found nothing matching the search terms.\n' if page_index == 0 else \ 'No more results.\n' for result in search.results: page += '\n' #ts = result[0] obj = result[1] if isinstance(obj, User): page += f'=> /u/{obj.name} {obj.avatar} u/{obj.name}\n' if obj.info: page += f'{clean_title(strip_links(obj.info))[:300].strip()}\n' elif isinstance(obj, Subspace): page += f'=> /s/{obj.name} s/{obj.name}\n' if obj.info: page += f'{clean_title(strip_links(obj.info))[:300].strip()}\n' elif isinstance(obj, Post): ctx = ("u/" if obj.sub_owner else "s/") + obj.sub_name kind = "Comment" if obj.parent else f"Issue #{obj.issueid}" if obj.issueid else "Post" title = f' "{shorten_text(obj.title, 30)}"' if obj.title else '' scope_desc = f"in {ctx} " if not scope and not obj.sub_owner else "" page += f'=> /{ctx}/{obj.issueid if obj.issueid else obj.id} {kind}{title} {scope_desc}by {obj.poster_avatar} {obj.poster_name} on {obj.ymd_date(tz=session.tz)} {" ¡ " if obj.tags else ""}{obj.tags}\n' SEGTYPES = ['content', 'URL', 'image', 'attachment', 'poll option'] if result[2] != Segment.TEXT: page += f'(matching {SEGTYPES[result[2]]}) ' page += obj.summary.replace('\n', ' ').replace('=>', ' ').strip() + '\n' if count >= LIMIT: page += f'\n=> {search_url}/{page_index + 1}?{req.query} Next page\nPage {page_index + 1}\n' return page def listed_items(items): if len(items) == 0: return '' if len(items) == 1: return items[0] return ', '.join(items[0:-1]) + ' and ' + items[-1] def make_timestamp(ts, fmt="%Y-%m-%d at %H:%M"): return datetime.datetime.fromtimestamp(ts, UTC).strftime(fmt) class GempubArchive: class Entry: def __init__(self, post, label, page, file=None): self.ts = post.ts_created self.dt = datetime.datetime.fromtimestamp(self.ts, UTC) self.post_id = post.id self.issueid = post.issueid self.title = post.title self.subspace_id = post.subspace self.user_id = post.user self.label = label self.page = page self.file = file self.tags = post.tags self.num_cmts = post.num_cmts self.num_likes = post.num_likes self.referenced_from_posts = [] def ymd(self): return self.dt.strftime('%Y-%m-%d') def path(self): if self.file: pos = self.file.segment_url.rfind('/') + 1 return f'file{self.file.id}_{self.file.segment_url[pos:]}' fn = re.sub(r'[^\w\d-]', '', self.title.replace(' ', '-')).lower().strip() # clean it up if len(fn) > 0: fn = '_' + fn #if len(fn) == 0: # fn = f'{self.dt.day}_post{self.post_id}.gmi' return f'{self.dt.year:04d}-{self.dt.month:02d}/{self.post_id}{fn}.gmi' def __init__(self, session, user=None, subspace=None, month_range=None): self.session = session self.db = session.db self.ts_range = None if month_range: year, month = month_range end_month = month + 1 if month < 12 else 1 end_year = year if month < 12 else year + 1 self.ts_range = ( datetime.datetime(year, month, 1, 0, 0, 0, tzinfo=UTC).timestamp(), datetime.datetime(end_year, end_month, 1, 0, 0, 0, tzinfo=UTC).timestamp() ) self.user = user self.subspace = subspace self.is_user = self.ts_range is None and subspace.owner != 0 assert self.is_user and self.user or not self.is_user and not self.user assert self.ts_range or self.subspace is not None # Modify settion so rendered pages appear to be not logged in. session.is_archive = True session.user = None self.site_link = session.server_root() if month_range: archive_title = f'{datetime.datetime(year, month, 1).strftime("%B %Y")}' archive_description = f'All posts and comments made on {session.bubble.site_name}. ' else: archive_title = f'{"s/" if not self.is_user else ""}{subspace.name} on {session.bubble.site_name}' archive_description = \ (f'All posts and comments made in the subspace {subspace.title()} on {session.bubble.site_name}. ' if not self.is_user else f'All posts and comments made by {user.name} on {session.bubble.site_name}. ') self.metadata = { 'gpubVersion': '1.0.0', 'title': archive_title, 'description': archive_description, 'author': f'Bubble v{session.bubble.version}', 'publishDate': time.strftime('%Y-%m-%d'), 'index': 'index.gmi' } self.local_entries = [] # posts in the archive's subspace self.foreign_entries = [] # posts in other subspaces self.subspace_entries = {} # subspace name => list of entries self.comment_entries = [] # posts where user has commented self.file_entries = [] # files self.entry_index = {} # indexed by post ID self.file_index = {} # indexed by file ID self.referenced_users = {} # info about posters self.total_count = [0, 0] self.subspace_count = {} # [posts, comments] self.subspaces = {} self.users = {} if self.is_user: self.users[self.user.id] = user self.add_user_page(self.user) def user_page(self, user): src = f'# {user.avatar} {user.name}\n' if user.info: src += user.info + '\n' if user.url: src += f'=> {user.url}\n' src += f'\n\n=> {self.site_link}/u/{user.name} {user.name} on {self.session.bubble.site_name}\n' src += 'The account was created on ' + \ make_timestamp(user.ts_created, '%Y-%m-%d') + '.\n' return src def get_subspace(self, id): if id not in self.subspaces: self.subspaces[id] = self.db.get_subspace(id=id) return self.subspaces[id] def get_user(self, id): if id not in self.users: self.users[id] = self.db.get_user(id=id) return self.users[id] def add_user_page(self, user): if not user.name in self.referenced_users: self.referenced_users[user.name] = (user, self.user_page(user)) def add_post_entry(self, post, is_comment=False): from feeds import make_post_page self.add_user_page(self.get_user(post.user)) # Modify session according to the post's subspace. self.session.context = self.get_subspace(post.subspace) self.session.is_context_tracker = (self.session.context.flags & Subspace.ISSUE_TRACKER) != 0 is_local = (post.subspace == self.subspace.id) if self.subspace else False if not self.ts_range: where = self.session.context.title() if not is_local and ( not self.is_user or is_comment) else None label_sub = ' ¡ ' + where if where else '' page = make_post_page(self.session, post) if self.ts_range: label = shorten_text(clean_title(strip_links(post.summary)), 150) else: label = (post.title if post.title else shorten_text(clean_title(strip_links(post.summary)), 100)) + label_sub entry = GempubArchive.Entry(post, label, page) # Check for referenced users. for username in re.findall(r'=> /u/([\w-]+)\s', page): ref = self.db.get_user(name=username) if ref: self.add_user_page(ref) if is_comment: self.comment_entries.append(entry) elif is_local: self.local_entries.append(entry) else: self.foreign_entries.append(entry) skey = self.session.context.name if skey in self.subspace_entries: self.subspace_entries[skey].append(entry) else: self.subspace_entries[skey] = [entry] if not post.id in self.entry_index: if not is_comment: self.add_count(post.subspace, (1, self.db.count_posts(parent_id=post.id, draft=False))) self.entry_index[post.id] = entry def add_count(self, subspace_id, count): self.total_count[0] += count[0] self.total_count[1] += count[1] if not subspace_id in self.subspace_count: self.subspace_count[subspace_id] = [count[0], count[1]] else: self.subspace_count[subspace_id][0] += count[0] self.subspace_count[subspace_id][1] += count[1] def render_post_entries(self): db = self.db # Entries for the user/subspace posts. if self.is_user: posts = db.get_posts(user=self.user, comment=False, draft=False) elif self.ts_range: posts = db.get_posts(ts_range=self.ts_range, comment=False, draft=False, sort_descending=False) else: posts = db.get_posts(subspace=self.subspace, comment=False, draft=False) for post in posts: self.add_post_entry(post) if self.is_user: # Make entries for posts where user has commented in. # TODO: Add a proper database query for this. commented_in = set() for cmt in db.get_posts(user=self.user, comment=True, draft=False, sort_descending=False): commented_in.add(cmt.parent) for post in [db.get_post(id=post_id) for post_id in commented_in]: if post and post.user != self.user.id: self.add_post_entry(post, is_comment=True) def render_file_entries(self): db = self.db for file in db.get_user_files(self.user) if self.user \ else db.get_subspace_files(self.subspace) if self.subspace \ else db.get_time_files(self.ts_range): post = db.get_post(id=file.segment_post) filesize = len(file.data) entry = GempubArchive.Entry(post, file.segment_label + f' [{filesize / 1024:.1f} KB, {file.mimetype}]', file.data, file) self.file_entries.append(entry) self.file_index[file.id] = entry def rewrite_internal_urls(self, entry: Entry): src = entry.page src_post_id = entry.post_id user_pattern = re.compile(r'^=>\s*/u/([\w%-]+)\s') if self.subspace: post_pattern = re.compile(r'^=>\s*/([us])/' + self.subspace.name + r'/(\d+)\s') else: post_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(\d+)\s') file_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(image|file)/(\d+)[^ ]*\s') root_pattern = re.compile(r'^=>\s*/([^ ]*)\s') rewritten = [] for line in src.split('\n'): m = user_pattern.search(line) if m: line = f'=> ../../users/{urlparse.unquote(m[1])}.gmi ' + line[m.end():] rewritten.append(line) continue m = post_pattern.search(line) if m: post_id = int(m[2]) if post_id in self.entry_index: line = f'=> ../../posts/{self.entry_index[post_id].path()} ' + line[m.end():] rewritten.append(line) continue m = file_pattern.search(line) if m: file_id = int(m[3]) if file_id in self.file_index: entry = self.file_index[file_id] line = f'=> ../../files/{entry.path()} ' + line[m.end():] rewritten.append(line) entry.referenced_from_posts.append(src_post_id) continue m = root_pattern.search(line) if m: line = f'=> {self.session.server_root()}/{m[1]} ' + line[m.end():] rewritten.append(line) continue rewritten.append(line) return '\n'.join(rewritten) def compress(self): # Create the ZIP archive. buffer = io.BytesIO() zip = zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=9) def counter_text(count): parts = [] if count[0]: parts.append(f'{count[0]} post{plural_s(count[0])}') if count[1]: parts.append(f'{count[1]} comment{plural_s(count[1])}') return ' and '.join(parts) with zip.open('metadata.txt', 'w') as f: for entry in self.metadata: f.write(f"{entry}: {self.metadata[entry]}\n".encode('utf-8')) with zip.open('title.gmi', 'w') as f: f.write(f""" # {self.user.name if self.is_user else self.subspace.name if self.subspace else self.metadata['title']} ## Gempub Archive {self.metadata['description']} Exported on {self.metadata['publishDate']}. """.encode('utf-8')) # Information about the user/subspace. if self.is_user: index_page = f'# {self.user.avatar} {self.user.name}\n\nTable of Contents:\n' index_page += '\n=> title.gmi Title page\n' profile_path = 'users/' + self.user.name + '.gmi' index_page += f'=> {profile_path} {self.user.avatar} {self.user.name}\n' elif self.subspace: index_page = f'# s/{self.subspace.name}\n\nTable of Contents:\n' index_page += '\n=> title.gmi Title page\n' profile_path = self.subspace.name + '.gmi' index_page += f'=> {profile_path} {self.subspace.name}\n' with zip.open(profile_path, 'w') as f: src = f'# {self.subspace.title()}\n' if self.subspace.info: src += self.subspace.info + '\n' if self.subspace.url: src += f'=> {self.subspace.url}\n' src += '\nThe subspace was created on ' + \ make_timestamp(self.subspace.ts_created, '%Y-%m-%d') + '.\n' f.write(src.encode('utf-8')) else: index_page = '# ' + self.metadata['title'] + '\n\nTable of Contents:\n\n' if self.local_entries: index_page += f'\n=> posts/index.gmi Posts in {self.subspace.title()}\n' local_index_page = f'# Posts in {self.subspace.title()}\n\n' for entry in self.local_entries: entry_path = 'posts/' + entry.path() local_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n' with zip.open(entry_path, 'w') as content: content.write(self.rewrite_internal_urls(entry).encode('utf-8')) with zip.open('posts/index.gmi', 'w') as content: content.write(local_index_page.encode('utf-8')) if self.ts_range: sub_links = [] for sub_name in sorted(self.subspace_entries.keys(), key=str.lower): first_entry = self.subspace_entries[sub_name][0] sub = self.get_subspace(first_entry.subspace_id) entry_path = f'{sub.title()[0]}_{sub.name}.gmi' sub_links.append(f'=> {entry_path} {sub.title()}\n') title_icon = '' if sub.owner: title_icon = f'{self.get_user(first_entry.user_id).avatar} ' sub_page = f'# {title_icon}{sub.title()}\n' sub_page += f'{counter_text(self.subspace_count[sub.id])} in this subspace.\n' for entry in self.subspace_entries[sub_name]: entry_user = self.get_user(entry.user_id) author = f'{entry_user.avatar} {entry_user.name}' meta = [] top = None if entry.issueid: top = f'[#{entry.issueid}] {entry.title}' meta.append(author) if entry.tags: top += f' ¡ {entry.tags}' elif not sub.owner: meta.append(author) meta.append(entry.dt.strftime('%Y-%m-%d %H:%M')) if entry.num_cmts > 0: meta.append(f'{entry.num_cmts} comment{plural_s(entry.num_cmts)}') if entry.num_likes > 0: meta.append(f'{entry.num_likes} like{plural_s(entry.num_likes)}') if entry.tags and not entry.issueid: meta.append(entry.tags) link = f'=> posts/{entry.path()}' if top: sub_page += f'\n{link} {top}\n{entry.label}\n{" ¡ ".join(meta)}\n' else: sub_page += f'\n{entry.label}\n{link} {" ¡ ".join(meta)}\n' # Write to the archive. with zip.open('posts/' + entry.path(), 'w') as content: content.write(self.rewrite_internal_urls(entry).encode('utf-8')) with zip.open(entry_path, 'w') as content: content.write(sub_page.encode('utf-8')) prev_type = None for link in sorted(sub_links, key=str.lower): if prev_type and prev_type != link[3]: index_page += '\n' index_page += link prev_type = link[3] # u or s index_page += '\n' elif self.foreign_entries: index_page += f'=> other/index.gmi Posts in Other Subspaces\n' foreign_index_page = '# Posts in Other Subspaces\n' last_sub = None for entry in sorted(self.foreign_entries, key=lambda e: self.get_subspace(e.subspace_id).name.lower()): entry_sub = self.get_subspace(entry.subspace_id) if entry_sub != last_sub: foreign_index_page += f'\n## {entry_sub.name}\n' last_sub = entry_sub entry_path = 'other/' + entry.path() foreign_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n' with zip.open(entry_path, 'w') as content: content.write(self.rewrite_internal_urls(entry).encode('utf-8')) with zip.open('other/index.gmi', 'w') as content: content.write(foreign_index_page.encode('utf-8')) if self.comment_entries: index_page += f'=> comments/index.gmi Commented Posts\n' comment_index_page = '# Commented Posts\n' for entry in self.comment_entries: entry_path = 'comments/' + entry.path() comment_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n' with zip.open(entry_path, 'w') as content: content.write(self.rewrite_internal_urls(entry).encode('utf-8')) with zip.open('comments/index.gmi', 'w') as content: content.write(comment_index_page.encode('utf-8')) if self.file_entries: index_page += '=> files/index.gmi File attachments\n' file_index_page = '# File Attachments\n' for entry in self.file_entries: entry_path = 'files/' + entry.path() file_index_page += f'\n=> {entry.path()} {entry.ymd()} {entry.label}\n' # List of posts that link to this file. for ref in entry.referenced_from_posts: ref_entry = self.entry_index[ref] file_index_page += f'=> ../posts/{ref_entry.path()} Referenced in: "{ref_entry.label}"\n' with zip.open(entry_path, 'w') as content: content.write(entry.page) with zip.open('files/index.gmi', 'w') as content: content.write(file_index_page.encode('utf-8')) index_page += '=> users/index.gmi Users\n' users_index_page = '# Users\n\nPosts and comments in this archive reference these users:\n\n' # Sort users case insensitively. for ref, (user, profile_text) in \ sorted(self.referenced_users.items(), key=lambda u: u[0].lower()): users_index_page += f'=> {ref}.gmi {user.avatar} {ref}\n' with zip.open('users/' + ref + '.gmi', 'w') as f: f.write(profile_text.encode('utf-8')) with zip.open('users/index.gmi', 'w') as f: f.write(users_index_page.encode('utf-8')) index_page += f'\n=> about/bubble.gmi đŸ’Ŧ About Bubble\n' with zip.open('about/bubble.gmi', 'w') as f: f.write(self.session.ABOUT.encode('utf-8')) with zip.open('index.gmi', 'w') as f: f.write(index_page.encode('utf-8')) zip.close() return buffer.getvalue() def export_gempub_archive(session): req = session.req db = session.db user = session.user if not user: return 60, 'Login required' # Determine subspace to export. m = re.search(r'/export/(s/|month/)?([\w%-]+)\.gpub$', req.path) if not m or not m[2]: return 59, 'Bad request' name = urlparse.unquote(m[2]) if m[1] == 'month/': month_range = map(int, m[2].split('-')) subspace = None else: month_range = None subspace = db.get_subspace(name=name) is_user = m[1] is None # Check access rights. At the moment, exporting is only possible via user # settings and subspace admin pages, so the user must have moderation # rights in the exported subspace. if month_range: if not user: # Have to be logged in. return 61, 'Not authorized' elif is_user: if subspace.owner != user.id: return 61, 'Not authorized' else: if user.id not in map(lambda u: u.id, db.get_mods(subspace)): return 61, 'Not authorized' archive = GempubArchive(session, user if is_user else None, subspace, month_range) archive.render_post_entries() archive.render_file_entries() data = archive.compress() return 20, 'application/gpub+zip', data