From a2f19397fb89d282f96d8552d7a2a756535fb1ef Mon Sep 17 00:00:00 2001 From: Daniel Schulte Date: Sun, 18 Mar 2018 16:56:49 +0100 Subject: [PATCH] Initial commit --- .gitignore | 2 + README.md | 43 +++++++ config.toml.example | 7 ++ printer.py | 266 ++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 + 5 files changed, 320 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config.toml.example create mode 100755 printer.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5016350 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +config.toml +venv/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..866a2bd --- /dev/null +++ b/README.md @@ -0,0 +1,43 @@ +# About: +This application monitors your Twitter home timeline or any number (within the Twitter API limits) of hashtags of your choice and prints all incoming Tweets via an ESC/POS compatible thermal printer. + +# Installation: +### System requirements +This application has been developed and tested on Linux, targeting python3.6 or newer. +It might work on macOS (likely) and Windows (very unlikely). + +### Create a virtualenv with the required dependencies +```bash +python3 -m venv venv +venv/bin/pip install -r requirements.txt +``` +### Getting Twitter API access +- If you already have access enter your consumer key/secret and access token/secret in the next step. +- If you don't have such credentials you must create a Twitter App and get the required information after you've created it. + +### Create and fill in config.toml +```bash +cp config.toml.example config.toml +$EDITOR config.toml +# Put the required information into config.toml +``` + +# Printing Tweets +### Check your thermal printer +- Ensure it supports ESC/POS. +- See if your operating system detects your printer (look at the output of dmesg). +- Check if the user that will be running this application can access the printer (check `ls -l /dev/usb/lp*`, `id $USER`). +- A line width of 48 characters and Latin-1 as printer codepage 6 are assumed. +- This application was tested with a Excelvan/Hoin HOP-E801 printer. No other models/printers have been tested yet. + +### Printing your home timeline +```bash +venv/bin/python3 printer.py --printer /path/to/printer +``` +If you don't want to see retweets make sure to include the `-n` or`--no-retweets` option in the commandline above + +### Printing a specific (or multiple) hashtags +```bash +venv/bin/python3 printer.py --printer /path/to/print "#hashtag1" "#hashtag2" +``` +If you want to see retweets in this mode make sure to include the `-r` or `--retweets` option in the commandline above diff --git a/config.toml.example b/config.toml.example new file mode 100644 index 0000000..39cf235 --- /dev/null +++ b/config.toml.example @@ -0,0 +1,7 @@ +[auth.consumer] +key="PUT_YOUR_CONSUMER_KEY_HERE" +secret="PUT_YOUR_CONSUMER_SECRET_HERE" + +[auth.access] +token="PUT_YOUR_ACCESS_TOKEN_HERE" +secret="PUT_YOUR_ACCESS_SECRET_HERE" diff --git a/printer.py b/printer.py new file mode 100755 index 0000000..f2d7750 --- /dev/null +++ b/printer.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python3 + +""" +TODO: Pseude Retweets (aka Commments). Tweets in which the only URL is to another Tweet. +TODO: Figure out if/when to show unshortened URLs in links contained in Tweets. +TODO: Figure out bimap mode so stuff that is not latin-1 encodable can still be printed. +""" + +import argparse +import datetime +import html +import logging +import sys + +import toml +import tweepy + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("tweet-printer") + +def load_config(): + fn = "config.toml" + cfg = {} + try: + with open(fn, "r") as f: + cfg = toml.load(f) + except FileNotFoundError: + logger.error(f"Can't open config file {fn}") + sys.exit(1) + except toml.TomlDecodeError as ex: + logger.error(f"Can't parse config file {fn}: {ex}") + sys.exit(1) + + if "auth" not in cfg: + logger.error("Can't find auth section in configuration data.") + sys.exit(1) + authdata = cfg["auth"] + if "consumer" not in authdata: + logger.error("Can't find consumer auth section in configuration data.") + sys.exit(1) + + consumerdata = authdata["consumer"] + if "key" not in consumerdata or not consumerdata["key"]: + logger.error("Can't find consumer key in configuration data.") + sys.exit(1) + if "secret" not in consumerdata or not consumerdata["secret"]: + logger.error("Can't find consumer secret in configuration data.") + sys.exit(1) + + if "access" not in authdata: + logger.error("Can't find access auth section of configuration data.") + sys.exit(1) + accessdata = authdata["access"] + if "token" not in accessdata: + logger.error("Can't find access token in configuration data.") + sys.exit(1) + if "secret" not in accessdata: + logger.error("Can't find access secret in configuration data.") + sys.exit(1) + + return cfg + + +def format_header(handle, name, dt, is_rt=None): + space_total = 28 + header = "" + if is_rt is None: + is_rt = False + if is_rt: + header += "RT by " + header += f"@{handle} "[:28] + space_for_name = space_total-len(header)-2 + + shortened_by=0 + + if len(name)<=space_for_name: + header+=f"({name})" + else: + shortened_name_len = space_for_name-3 + if shortened_name_len>0: + shortened_by = len(name)-shortened_name_len + header+=f"({name[:shortened_name_len]}...)" + + header = header.ljust(28) + header += dt.strftime(" %Y-%m-%d %H:%M:%S") + #if shortened_by>0: + # header += f" name shoretened by {shortened_by}" + return header + + +class Printer: + def __init__(self, port): + self.port = port + + def __enter__(self): + self.printer = open(self.port, "wb") + self.printer.write(bytes([0x1b, 0x40])) # Initialize printer + self.printer.write(bytes([0x1b, 0x74, 0x06])) # Set character code table to CP1252/Latin-1 + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.printer.close() + + def write(self, data, newline=True, cut=False): + if type(data) is str: + data = data.encode("latin-1") + elif type(data) is not bytes: + logger.error("Unsupported type for Printer.write: "+str(type(data))) + return + self.printer.write(data) + if newline: + self.printer.write(bytes([13, 10])) + if cut: + self.printer.write(bytes([0x1d, 0x56, # Feed and Cut + 0x42, # Feed to cut position with extra length and cut + 10 # extra*Y Unit exta space + ])) + + def flush(self): + self.printer.flush() + + +class StreamListener(tweepy.StreamListener): + def __init__(self, title, ignore_rt=False, printer=None): + super().__init__() + self.title = title + self.seen_tweets = set() + self.ignore_rt = ignore_rt + self.printer = printer + + def on_status(self, status): + + if status.id in self.seen_tweets: + print("********", status.id, "Already seen this tweet before") + return True + + #with open("status.log", "at+") as f: + # f.write(repr(status)) + # f.write("\n\n") + + rt = getattr(status, "retweeted_status", None) + + if not rt: + print("********", status.id) + self.seen_tweets.add(status.id) + text = status.extended_tweet["full_text"] if status.truncated else status.text + text = html.unescape(text) + #print("[{}] New tweet from @{}: {}".format(self.title, status.user.screen_name, text)) + header = format_header(status.user.screen_name, status.user.name, status.created_at) + print(header) + print(text) + if self.printer: + try: + encoded_header = header.encode("latin-1") + encoded_text = text.encode("latin-1") + self.printer.write(encoded_header) + self.printer.write(encoded_text, True, True) + self.printer.flush() + except UnicodeEncodeError: + print("Can't print. Encoding issue.") + else: + if self.ignore_rt: + #print("******** Ignoring Retweet") + return True + if rt.id in self.seen_tweets: + print("********", rt.id, "Already seen this retweeted before") + return True + print("********", rt.id) + self.seen_tweets.add(rt.id) + rt_text = rt.extended_tweet["full_text"] if rt.truncated else rt.text + rt_text = html.unescape(rt_text) + #print("[{}] New retweet by @{}:".format(self.title, status.user.screen_name)) + #print("@{}: {}".format(rt.user.screen_name, rt_text)) + header = format_header(status.user.screen_name, status.user.name, status.created_at, is_rt=True) + rt_header = format_header(rt.user.screen_name, rt.user.name, rt.created_at) + print(header) + print(rt_header) + print(rt_text) + return True + + def on_error(self, status_code): + if status_code == 420: + return False + +""" +------------------------------------------------ +@handle (name) xxxx-xx-xx xx:xx:xx +RT by @handle (name) xxxx-xx-xx xx:xx:xx +---------------------------- +""" + + +def check_rate_limits(api): + rate_limits = api.rate_limit_status() + for category, endpoints in rate_limits["resources"].items(): + for endpoint, data in endpoints.items(): + limit = int(data["limit"]) + remaining = int(data["remaining"]) + reset = data["reset"] + if limit!=remaining and endpoint not in ["/application/rate_limit_status"]: + print(endpoint, f"{remaining}/{limit}", + "Reset in:", datetime.datetime.fromtimestamp(reset)-datetime.datetime.now()) + + +def main(): + + parser = argparse.ArgumentParser(description="Prints tweets to a thermal printer") + parser.add_argument("-p", "--printer", type=str, metavar="printer", + help="Which thermal printer to use (e.g. /dev/usb/lp1). If empty nothing will be printed.") + rt_handling = parser.add_mutually_exclusive_group() + rt_handling.add_argument("-n", "--no-retweets", action="store_true", default=None, + help="Don't show retweets (default when monitoring hashtags)") + rt_handling.add_argument("-r", "--retweets", action="store_true", default=None, + help="Show retweets (default when showing user timeline)") + parser.add_argument(metavar="keyword", nargs="*", type=str, dest="keywords", + help="What to track. You can specify hashtags or keywords here. If empty the user timeline will be monitored instead") + + args = parser.parse_args() + + config = load_config() + + consumer_key = config["auth"]["consumer"]["key"] + consumer_secret = config["auth"]["consumer"]["secret"] + + access_token = config["auth"]["access"]["token"] + access_secret = config["auth"]["access"]["secret"] + + auth = tweepy.OAuthHandler(consumer_key, consumer_secret) + auth.set_access_token(access_token, access_secret) + + api = tweepy.API(auth) + + check_rate_limits(api) + + show_rts = None + if len(args.keywords) == 0: + show_rts = False if args.no_retweets else True + else: + show_rts = True if args.retweets else False + + stream_name = ",".join(args.keywords) if args.keywords else "Timeline" + + def start_streaming(listener, api): + stream = tweepy.Stream(listener=listener, auth=api.auth) + if args.keywords: + stream.filter(track=args.keywords) + else: + stream.userstream() + + try: + if args.printer: + with Printer(args.printer) as thermal: + logger.info("Streaming {} to thermal printer {}...".format(stream_name, thermal.port)) + listener = StreamListener(stream_name, ignore_rt=not show_rts, printer=thermal) + start_streaming(listener, api) + else: + logger.info("Streaming {}...".format(stream_name)) + listener = StreamListener(stream_name, ignore_rt=not show_rts) + start_streaming(listener, api) + + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..247abe0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +toml==0.9.4 +tweepy==3.6.0