#!/usr/bin/env python3 """ TODO: Pseudo Retweets (aka Commments). Tweets in which the only URL is to another Tweet. TODO: Figure out if/when to show unshortened URLs in links contained in Tweets. TODO: Figure out bitmap mode so stuff that is not Latin-1 encodable can still be printed. """ __version__ = "0.0.2" import argparse import datetime import html import logging import sys import textwrap import toml import tweepy logging.basicConfig(level=logging.INFO) logger = logging.getLogger("tweet-printer") printer_line_width = 48 def load_config(): fn = "config.toml" cfg = {} try: with open(fn, "r") as f: cfg = toml.load(f) except FileNotFoundError: logger.error(f"Can't open config file {fn}") sys.exit(1) except toml.TomlDecodeError as ex: logger.error(f"Can't parse config file {fn}: {ex}") sys.exit(1) if "auth" not in cfg: logger.error("Can't find auth section in configuration data.") sys.exit(1) authdata = cfg["auth"] if "consumer" not in authdata: logger.error("Can't find consumer auth section in configuration data.") sys.exit(1) consumerdata = authdata["consumer"] if "key" not in consumerdata or not consumerdata["key"]: logger.error("Can't find consumer key in configuration data.") sys.exit(1) if "secret" not in consumerdata or not consumerdata["secret"]: logger.error("Can't find consumer secret in configuration data.") sys.exit(1) if "access" not in authdata: logger.error("Can't find access auth section of configuration data.") sys.exit(1) accessdata = authdata["access"] if "token" not in accessdata: logger.error("Can't find access token in configuration data.") sys.exit(1) if "secret" not in accessdata: logger.error("Can't find access secret in configuration data.") sys.exit(1) return cfg def format_header(handle, name, dt, is_rt=None): space_for_datetime = 20 space = printer_line_width-space_for_datetime header = "" if is_rt is None: is_rt = False if is_rt: header += "RT by " header += f"@{handle} "[:space] space_for_name = space-len(header)-2 shortened_by=0 if len(name)<=space_for_name: header+=f"({name})" else: shortened_name_len = space_for_name-3 if shortened_name_len>0: shortened_by = len(name)-shortened_name_len header+=f"({name[:shortened_name_len]}...)" header = header.ljust(space) header += dt.strftime(" %Y-%m-%d %H:%M:%S") return header class Printer: def __init__(self, port, **kwargs): self.port = port for arg, default in [("flipped", False)]: setattr(self, arg, kwargs.get(arg, default)) def __enter__(self): self.printer = open(self.port, "wb") self.printer.write(bytes([0x1b, 0x40])) # Initialize printer self.printer.write(bytes([0x1b, 0x74, 0x06])) # Set character code table to CP1252/Latin-1 if self.flipped: self.printer.write(bytes([0x1b, 0x7b, 0x01])) # Set printer to upside-down mode return self def __exit__(self, exc_type, exc_value, traceback): self.printer.close() def write(self, data, newline=True, cut=False): if type(data) is str: data = data.encode("latin-1") elif type(data) is not bytes: logger.error("Unsupported type for Printer.write: "+str(type(data))) return self.printer.write(data) if newline: self.printer.write(bytes([13, 10])) # CR, LF if cut: self.printer.write(bytes([0x1d, 0x56, # Feed and Cut 0x42, # Feed to cut position with extra length and cut 10 # extra*Y Unit exta space ])) def flush(self): self.printer.flush() class StreamListener(tweepy.StreamListener): def __init__(self, title, ignore_rt=False, printer=None): super().__init__() self.title = title self.seen_tweets = set() self.ignore_rt = ignore_rt self.printer = printer @classmethod def split_and_encode_text(cls, text, extra_lines=[]): lines = extra_lines for line in text.splitlines(True): stripped = line.strip() if stripped: lines += textwrap.wrap(stripped, printer_line_width) else: lines.append(stripped) return [line.encode("latin-1") for line in lines] def print_tweet(self, text, headers): try: encoded_lines = self.split_and_encode_text(text, headers) if self.printer.flipped: encoded_lines.reverse() for line in encoded_lines: self.printer.write(line) self.printer.write("", False, True) self.printer.flush() except UnicodeEncodeError: print("Can't print. Encoding issue.") def on_status(self, status): if status.id in self.seen_tweets: print("********", status.id, "Already seen this tweet before") return True #with open("status.log", "at+") as f: # f.write(repr(status)) # f.write("\n\n") rt = getattr(status, "retweeted_status", None) if not rt: print("********", status.id) self.seen_tweets.add(status.id) text = status.extended_tweet["full_text"] if status.truncated else status.text text = html.unescape(text) header = format_header(status.user.screen_name, status.user.name, status.created_at) print(header) print(text) if self.printer: self.print_tweet(text, [header]) else: if self.ignore_rt: #print("******** Ignoring Retweet") return True if rt.id in self.seen_tweets: print("********", rt.id, "Already seen this retweeted before") return True print("********", rt.id) self.seen_tweets.add(rt.id) rt_text = rt.extended_tweet["full_text"] if rt.truncated else rt.text rt_text = html.unescape(rt_text) header = format_header(status.user.screen_name, status.user.name, status.created_at, is_rt=True) rt_header = format_header(rt.user.screen_name, rt.user.name, rt.created_at) print(header) print(rt_header) print(rt_text) if self.printer: self.print_tweet(rt_text, [header, rt_header]) return True def on_error(self, status_code): if status_code == 420: return False def check_rate_limits(api): rate_limits = api.rate_limit_status() for category, endpoints in rate_limits["resources"].items(): for endpoint, data in endpoints.items(): limit = int(data["limit"]) remaining = int(data["remaining"]) reset = data["reset"] if limit!=remaining and endpoint not in ["/application/rate_limit_status"]: print(endpoint, f"{remaining}/{limit}", "Reset in:", datetime.datetime.fromtimestamp(reset)-datetime.datetime.now()) def main(): parser = argparse.ArgumentParser(description="Prints tweets to a thermal printer") parser.add_argument("-p", "--printer", type=str, metavar="printer", help="Which thermal printer to use (e.g. /dev/usb/lp1). If empty nothing will be printed.") rt_handling = parser.add_mutually_exclusive_group() rt_handling.add_argument("-n", "--no-retweets", action="store_true", default=None, help="Don't show retweets (default when monitoring hashtags)") rt_handling.add_argument("-r", "--retweets", action="store_true", default=None, help="Show retweets (default when showing user timeline)") parser.add_argument(metavar="keyword", nargs="*", type=str, dest="keywords", help="What to track. You can specify hashtags or keywords here. If empty the user timeline will be monitored instead") args = parser.parse_args() config = load_config() printer_config = config.get("printer", {}) flip_printer = printer_config.get("flipped", False) consumer_key = config["auth"]["consumer"]["key"] consumer_secret = config["auth"]["consumer"]["secret"] access_token = config["auth"]["access"]["token"] access_secret = config["auth"]["access"]["secret"] auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_secret) api = tweepy.API(auth) check_rate_limits(api) stream_name = ",".join(args.keywords) if args.keywords else "Timeline" show_rts = None if len(args.keywords) == 0: show_rts = False if args.no_retweets else True else: show_rts = True if args.retweets else False stream_name += " (including Retweets)" if show_rts else " (without Retweets)" def start_streaming(listener, api): stream = tweepy.Stream(listener=listener, auth=api.auth) if args.keywords: stream.filter(track=args.keywords) else: stream.userstream() try: if args.printer: with Printer(args.printer, flipped=flip_printer) as thermal: logger.info("Streaming {} to thermal printer {}...".format(stream_name, thermal.port)) listener = StreamListener(stream_name, ignore_rt=not show_rts, printer=thermal) start_streaming(listener, api) else: logger.info("Streaming {}...".format(stream_name)) listener = StreamListener(stream_name, ignore_rt=not show_rts) start_streaming(listener, api) except KeyboardInterrupt: pass if __name__ == "__main__": main()