Linkmail
A Linkspace Application
Available in the pkg or in repository/examples
#!/bin/env python3 # This is a bare bone mail TUI to write each other messages with your standard $EDITOR. import os,sys,logging,tempfile,subprocess,shlex,functools,cmd,argparse,getpass from typing import Tuple,List from linkspace import * from pathlib import Path #logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) parser = argparse.ArgumentParser(description='Linkmail') parser.add_argument('--dir', dest='dir', type=str, default="",help='location of the linkspace instance (default: $LK_DIR | $HOME/linkspace)') parser.add_argument('--group', dest='group', type=str, default=os.environ.get('LK_GROUP','[#:pub]'),help='group (default: $LK_GROUP | [#:pub])') parser.add_argument('--key', dest='key', type=str, default=os.environ.get('LK_KEYNAME',"me:local"),help='use key (default: $LK_KEYNAME | me:local)') parser.add_argument('--password', dest='password', default=os.environ.get('LK_PASS'),help='use key (defaults: $LK_PASS )') args = parser.parse_args() lk = lk_open(dir=args.dir) print(lk_info(lk).dir) group=lk_eval(args.group) try: print("Using key:",lk_eval2str(f"[@:{args.key}/?b]")) except: print(args.key , " not found - we'll try creating it") args.password = args.password if args.password is not None else getpass.getpass(prompt='Password> ', stream=None) print("Unlocking key", args.password); key = lk_key(lk,password=lk_eval(args.password),name=args.key,create=True) lk_process(lk) # required for lk_encode to pick up name if lk_key just generated args.key = lk_encode(key.pubkey,"@/b") # ensure we use the preferred name for this key print(f"Key ok: ",args.key); common_q = lk_query_parse(lk_query(),"domain:=:linkmail","group:=:"+args.group) linkmail_keypoint = functools.partial(lk_keypoint,key=key,domain=b"linkmail",group=group) linkmail_linkpoint = functools.partial(lk_linkpoint,domain=b"linkmail",group=group) def tag_str(t): """strip nulls from 16 byte value and escape""" return lk_eval2str("[0/?a0]",argv=[t]) def links_str(links=[]): return "\n".join([tag_str(l.tag) + " " + lk_encode(l.ptr,"#/@/b") for l in links]) first_mail = "Hello world!" def user_write_mail(links = [],notes = "") -> Tuple[str,List[Link],str]: global first_mail editor = os.environ.get('VISUAL', os.environ.get('EDITOR', 'vi')) with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tf: tf.write(f"{first_mail}\n==LINKS==\n".encode()) tf.write(links_str(links).encode()) if notes: tf.write(f"\n==NOTES==\n{notes}".encode()) tf.flush() # will not work on windows - must first close file tmp_file = tf.name process = subprocess.Popen(shlex.split(f"{editor} \"{tmp_file}\"")) process.wait() first_mail = "" mail,*rest = Path(tmp_file).read_text("utf-8").split("\n==LINKS==\n",1) if not rest: return (mail,[],"") links,*notes = rest[0].split("\n==NOTES==\n",1) def read_link(line): print(line) [tag,link] = line.split(maxsplit=1) return Link(lk_eval(f"[a:{tag}]"),lk_eval(link)) links = [read_link(line) for line in links.splitlines() if line] return (mail,links,notes[0] if notes else "") def get_exchange_status(watch_finish=False): status =[] lk_status_watch(lk,qid=b"status", callback=lambda pkt: status.append(pkt) , timeout=lk_eval("[us:+2s]"), domain=b"exchange", group=group, objtype=b"process") ok = lk_process_while(lk,qid=b"status") return status intro = """ linkmail - A simple linkspace mail system Use 'new' to write a new linkmail. Use 'list [subj] [limit]' to print a list of linkmail. Use 'pull [subj]' to request mail from the group. Use 'queue' to print a list of recently received linkmail Use 'open [N]' to open a linkmail Every time a list is displayed with a number you can: - 'open <N>' to open it - 'link <N> [tag]' to save it for use during 'new' Use 'help list' for more options """ list_template = "[/or:[/?:[pubkey]/@]:\\[[pubkey/2mini]\\]]:[spacename:str] = [data/?a/slice::20/rpad:20: ] [create/us:delta/rfixed:18: ]:[hash/2mini] # [links_len:str]([:[/links:[tag:str] [ptr/2mini],]/~rcut:32])" class Linkmail(cmd.Cmd): intro =intro prompt = "> " links : List[Link] = [] notes = "Add notes here" lst : List[Pkt] = [] queue_in : List[Pkt] = [] # last from lst, or last 'open' last_shown : Pkt = linkmail_linkpoint(data="Nothing here so far",create=int(0).to_bytes(8,byteorder='big')) def precmd(self,line): lk_process(lk) return line def postcmd(self,stop,_line): if self.queue_in: print("New messages (use 'queue')") return stop def print_entry(self,pkt:Pkt,prnt=True,tag="") -> int: i = len(self.lst) self.lst.append(pkt) self.last_shown = pkt; if prnt: print(i,tag,lk_eval2str(list_template,pkt)) return i def print_list(self,lst:List[Pkt]): self.lst.clear() for p in lst: self.print_entry(p) def do_queue(self,_): """List the packets received since the last call to queue (or starting this proc)""" self.print_list(self.queue_in) self.queue_in.clear() def do_status(self,_): ex_status = get_exchange_status(True) if not ex_status: print(f"No exchange running for {args.group} - pull requests will be ignored") return print(f"Exchange for {args.group} ok:") for e in ex_status: print(lk_eval2str("([hash/2mini]) [comp2]/[comp3]\\n[data]\\n",e)) def do_pull(self,spacename= "",): """Notify the exchange process to start pulling messages (from [spacename])""" ex_status = get_exchange_status() if not ex_status: print(f"No exchange running for {args.group} - pull requests will be ignored") return print(f"Pulling from {args.group}") logging.debug(ex_status) q = lk_query(common_q) spacename_b = lk_eval(f"[/~/mail/{spacename}]") # we use the path in binary form. Two strings might differ but eval to the same bytes q = lk_query_push(q,"","qid",spacename_b) q = lk_query_push(q,"spacename","=",spacename_b) q = lk_query_push(q,"","follow",b"") lk_pull(lk,q) def do_new(self,spacename): """write a new mail""" spacename,*rest = shlex.split(spacename or "/") spacename_b = lk_eval(f"[/~/mail/{spacename}]") (data,links,notes) = user_write_mail(self.links,self.notes) self.notes = notes pkt = linkmail_keypoint(data=data,links=links,spacename=spacename_b) self.last_shown = pkt print(str(pkt)) if (input("Ok[Y/n]?") or "Y") in "Yy": if not get_exchange_status(): if not ( input(f"No exchange running for {args.group} - Write anyways? [Y/n]") or "Y" in "Yy" ): return lk_save(lk,pkt) self.links=[] def do_threads(self,spacename): """List all threads""" spacename,*rest = shlex.split(spacename or "/") q = lk_query(common_q) q = lk_query_parse(q,f"prefix:=:[/~/mail/{spacename}]","i_branch:=:[u32:0]",*rest) logging.debug(q) lst = [] lk_get_all(lk,q,lambda pkt: lst.append(pkt)) self.print_list(lst) def do_list(self,spacename): """list all messages [spacename] [limit] - e.g. list / recv:>:[now:-1D] pubkey:=:[@:alice:nl] create:>:[-2D]""" spacename,*rest = shlex.split(spacename or "/") q = lk_query(common_q) q = lk_query_parse(q,f"spacename:=:[/~/mail/{spacename}]",*rest) logging.debug(q) lst = [] lk_get_all(lk,q,lambda pkt: lst.append(pkt)) lst.sort(key = lambda pkt: pkt.create) self.print_list(lst) def do_open(self,idx): pkt = self.lst[int(idx)] if idx else self.last_shown self.last_shown = pkt print_template= "==[hash:str]==\\n[/~?:[pubkey]/@/b]\\n[spacename:str]\\n[create/us:str]\\n[data/~utf8]\\n" print(lk_eval2str(print_template,pkt)) self.lst.clear() for link in pkt.links: pkt = lk_get(lk,lk_hash_query(link.ptr)) if not pkt: print("\t",tag_str(link.tag), " ", lk_encode(link.ptr,"#/@/b")) else: print(len(self.lst),"\t",tag_str(link.tag), " ", b64(link.ptr,mini=True),lk_eval2str(list_template,pkt)) self.lst.append(pkt) def do_link(self,arg): """save a link used in the next mail""" if arg == "clear": self.links = [] return select,*rest = shlex.split(arg or "shown") link = self.last_shown.hash if select =="shown" else self.lst[int(select[0])].hash tag = rest[0] if rest else "link" self.links.append(Link(tag,link)) print("Current Links") print(links_str(self.links)) def do_EOF(self, _): return True def new_mail_pkt(self,pkt): if pkt.pubkey != key.pubkey: self.queue_in.append(pkt) dloop = Linkmail() new_mail = lk_query_parse(lk_query(common_q),":qid:incoming","prefix:=:/mail","i_db:<:[u32:0]") lk_watch(lk,new_mail,lambda p: dloop.new_mail_pkt(p)) dloop.do_pull("") while True: try: dloop.cmdloop() except Exception as e: logging.warning(e, exc_info=True)