Initial commit
This commit is contained in:
commit
a2f19397fb
|
@ -0,0 +1,2 @@
|
||||||
|
config.toml
|
||||||
|
venv/
|
|
@ -0,0 +1,43 @@
|
||||||
|
# About:
|
||||||
|
This application monitors your Twitter home timeline or any number (within the Twitter API limits) of hashtags of your choice and prints all incoming Tweets via an ESC/POS compatible thermal printer.
|
||||||
|
|
||||||
|
# Installation:
|
||||||
|
### System requirements
|
||||||
|
This application has been developed and tested on Linux, targeting python3.6 or newer.
|
||||||
|
It might work on macOS (likely) and Windows (very unlikely).
|
||||||
|
|
||||||
|
### Create a virtualenv with the required dependencies
|
||||||
|
```bash
|
||||||
|
python3 -m venv venv
|
||||||
|
venv/bin/pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
### Getting Twitter API access
|
||||||
|
- If you already have access enter your consumer key/secret and access token/secret in the next step.
|
||||||
|
- If you don't have such credentials you must create a Twitter App and get the required information after you've created it.
|
||||||
|
|
||||||
|
### Create and fill in config.toml
|
||||||
|
```bash
|
||||||
|
cp config.toml.example config.toml
|
||||||
|
$EDITOR config.toml
|
||||||
|
# Put the required information into config.toml
|
||||||
|
```
|
||||||
|
|
||||||
|
# Printing Tweets
|
||||||
|
### Check your thermal printer
|
||||||
|
- Ensure it supports ESC/POS.
|
||||||
|
- See if your operating system detects your printer (look at the output of dmesg).
|
||||||
|
- Check if the user that will be running this application can access the printer (check `ls -l /dev/usb/lp*`, `id $USER`).
|
||||||
|
- A line width of 48 characters and Latin-1 as printer codepage 6 are assumed.
|
||||||
|
- This application was tested with a Excelvan/Hoin HOP-E801 printer. No other models/printers have been tested yet.
|
||||||
|
|
||||||
|
### Printing your home timeline
|
||||||
|
```bash
|
||||||
|
venv/bin/python3 printer.py --printer /path/to/printer
|
||||||
|
```
|
||||||
|
If you don't want to see retweets make sure to include the `-n` or`--no-retweets` option in the commandline above
|
||||||
|
|
||||||
|
### Printing a specific (or multiple) hashtags
|
||||||
|
```bash
|
||||||
|
venv/bin/python3 printer.py --printer /path/to/print "#hashtag1" "#hashtag2"
|
||||||
|
```
|
||||||
|
If you want to see retweets in this mode make sure to include the `-r` or `--retweets` option in the commandline above
|
|
@ -0,0 +1,7 @@
|
||||||
|
[auth.consumer]
|
||||||
|
key="PUT_YOUR_CONSUMER_KEY_HERE"
|
||||||
|
secret="PUT_YOUR_CONSUMER_SECRET_HERE"
|
||||||
|
|
||||||
|
[auth.access]
|
||||||
|
token="PUT_YOUR_ACCESS_TOKEN_HERE"
|
||||||
|
secret="PUT_YOUR_ACCESS_SECRET_HERE"
|
|
@ -0,0 +1,266 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
"""
|
||||||
|
TODO: Pseude Retweets (aka Commments). Tweets in which the only URL is to another Tweet.
|
||||||
|
TODO: Figure out if/when to show unshortened URLs in links contained in Tweets.
|
||||||
|
TODO: Figure out bimap mode so stuff that is not latin-1 encodable can still be printed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import datetime
|
||||||
|
import html
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import toml
|
||||||
|
import tweepy
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
logger = logging.getLogger("tweet-printer")
|
||||||
|
|
||||||
|
def load_config():
|
||||||
|
fn = "config.toml"
|
||||||
|
cfg = {}
|
||||||
|
try:
|
||||||
|
with open(fn, "r") as f:
|
||||||
|
cfg = toml.load(f)
|
||||||
|
except FileNotFoundError:
|
||||||
|
logger.error(f"Can't open config file {fn}")
|
||||||
|
sys.exit(1)
|
||||||
|
except toml.TomlDecodeError as ex:
|
||||||
|
logger.error(f"Can't parse config file {fn}: {ex}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if "auth" not in cfg:
|
||||||
|
logger.error("Can't find auth section in configuration data.")
|
||||||
|
sys.exit(1)
|
||||||
|
authdata = cfg["auth"]
|
||||||
|
if "consumer" not in authdata:
|
||||||
|
logger.error("Can't find consumer auth section in configuration data.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
consumerdata = authdata["consumer"]
|
||||||
|
if "key" not in consumerdata or not consumerdata["key"]:
|
||||||
|
logger.error("Can't find consumer key in configuration data.")
|
||||||
|
sys.exit(1)
|
||||||
|
if "secret" not in consumerdata or not consumerdata["secret"]:
|
||||||
|
logger.error("Can't find consumer secret in configuration data.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if "access" not in authdata:
|
||||||
|
logger.error("Can't find access auth section of configuration data.")
|
||||||
|
sys.exit(1)
|
||||||
|
accessdata = authdata["access"]
|
||||||
|
if "token" not in accessdata:
|
||||||
|
logger.error("Can't find access token in configuration data.")
|
||||||
|
sys.exit(1)
|
||||||
|
if "secret" not in accessdata:
|
||||||
|
logger.error("Can't find access secret in configuration data.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
return cfg
|
||||||
|
|
||||||
|
|
||||||
|
def format_header(handle, name, dt, is_rt=None):
|
||||||
|
space_total = 28
|
||||||
|
header = ""
|
||||||
|
if is_rt is None:
|
||||||
|
is_rt = False
|
||||||
|
if is_rt:
|
||||||
|
header += "RT by "
|
||||||
|
header += f"@{handle} "[:28]
|
||||||
|
space_for_name = space_total-len(header)-2
|
||||||
|
|
||||||
|
shortened_by=0
|
||||||
|
|
||||||
|
if len(name)<=space_for_name:
|
||||||
|
header+=f"({name})"
|
||||||
|
else:
|
||||||
|
shortened_name_len = space_for_name-3
|
||||||
|
if shortened_name_len>0:
|
||||||
|
shortened_by = len(name)-shortened_name_len
|
||||||
|
header+=f"({name[:shortened_name_len]}...)"
|
||||||
|
|
||||||
|
header = header.ljust(28)
|
||||||
|
header += dt.strftime(" %Y-%m-%d %H:%M:%S")
|
||||||
|
#if shortened_by>0:
|
||||||
|
# header += f" name shoretened by {shortened_by}"
|
||||||
|
return header
|
||||||
|
|
||||||
|
|
||||||
|
class Printer:
|
||||||
|
def __init__(self, port):
|
||||||
|
self.port = port
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.printer = open(self.port, "wb")
|
||||||
|
self.printer.write(bytes([0x1b, 0x40])) # Initialize printer
|
||||||
|
self.printer.write(bytes([0x1b, 0x74, 0x06])) # Set character code table to CP1252/Latin-1
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
self.printer.close()
|
||||||
|
|
||||||
|
def write(self, data, newline=True, cut=False):
|
||||||
|
if type(data) is str:
|
||||||
|
data = data.encode("latin-1")
|
||||||
|
elif type(data) is not bytes:
|
||||||
|
logger.error("Unsupported type for Printer.write: "+str(type(data)))
|
||||||
|
return
|
||||||
|
self.printer.write(data)
|
||||||
|
if newline:
|
||||||
|
self.printer.write(bytes([13, 10]))
|
||||||
|
if cut:
|
||||||
|
self.printer.write(bytes([0x1d, 0x56, # Feed and Cut
|
||||||
|
0x42, # Feed to cut position with extra length and cut
|
||||||
|
10 # extra*Y Unit exta space
|
||||||
|
]))
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
self.printer.flush()
|
||||||
|
|
||||||
|
|
||||||
|
class StreamListener(tweepy.StreamListener):
|
||||||
|
def __init__(self, title, ignore_rt=False, printer=None):
|
||||||
|
super().__init__()
|
||||||
|
self.title = title
|
||||||
|
self.seen_tweets = set()
|
||||||
|
self.ignore_rt = ignore_rt
|
||||||
|
self.printer = printer
|
||||||
|
|
||||||
|
def on_status(self, status):
|
||||||
|
|
||||||
|
if status.id in self.seen_tweets:
|
||||||
|
print("********", status.id, "Already seen this tweet before")
|
||||||
|
return True
|
||||||
|
|
||||||
|
#with open("status.log", "at+") as f:
|
||||||
|
# f.write(repr(status))
|
||||||
|
# f.write("\n\n")
|
||||||
|
|
||||||
|
rt = getattr(status, "retweeted_status", None)
|
||||||
|
|
||||||
|
if not rt:
|
||||||
|
print("********", status.id)
|
||||||
|
self.seen_tweets.add(status.id)
|
||||||
|
text = status.extended_tweet["full_text"] if status.truncated else status.text
|
||||||
|
text = html.unescape(text)
|
||||||
|
#print("[{}] New tweet from @{}: {}".format(self.title, status.user.screen_name, text))
|
||||||
|
header = format_header(status.user.screen_name, status.user.name, status.created_at)
|
||||||
|
print(header)
|
||||||
|
print(text)
|
||||||
|
if self.printer:
|
||||||
|
try:
|
||||||
|
encoded_header = header.encode("latin-1")
|
||||||
|
encoded_text = text.encode("latin-1")
|
||||||
|
self.printer.write(encoded_header)
|
||||||
|
self.printer.write(encoded_text, True, True)
|
||||||
|
self.printer.flush()
|
||||||
|
except UnicodeEncodeError:
|
||||||
|
print("Can't print. Encoding issue.")
|
||||||
|
else:
|
||||||
|
if self.ignore_rt:
|
||||||
|
#print("******** Ignoring Retweet")
|
||||||
|
return True
|
||||||
|
if rt.id in self.seen_tweets:
|
||||||
|
print("********", rt.id, "Already seen this retweeted before")
|
||||||
|
return True
|
||||||
|
print("********", rt.id)
|
||||||
|
self.seen_tweets.add(rt.id)
|
||||||
|
rt_text = rt.extended_tweet["full_text"] if rt.truncated else rt.text
|
||||||
|
rt_text = html.unescape(rt_text)
|
||||||
|
#print("[{}] New retweet by @{}:".format(self.title, status.user.screen_name))
|
||||||
|
#print("@{}: {}".format(rt.user.screen_name, rt_text))
|
||||||
|
header = format_header(status.user.screen_name, status.user.name, status.created_at, is_rt=True)
|
||||||
|
rt_header = format_header(rt.user.screen_name, rt.user.name, rt.created_at)
|
||||||
|
print(header)
|
||||||
|
print(rt_header)
|
||||||
|
print(rt_text)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def on_error(self, status_code):
|
||||||
|
if status_code == 420:
|
||||||
|
return False
|
||||||
|
|
||||||
|
"""
|
||||||
|
------------------------------------------------
|
||||||
|
@handle (name) xxxx-xx-xx xx:xx:xx
|
||||||
|
RT by @handle (name) xxxx-xx-xx xx:xx:xx
|
||||||
|
----------------------------
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def check_rate_limits(api):
|
||||||
|
rate_limits = api.rate_limit_status()
|
||||||
|
for category, endpoints in rate_limits["resources"].items():
|
||||||
|
for endpoint, data in endpoints.items():
|
||||||
|
limit = int(data["limit"])
|
||||||
|
remaining = int(data["remaining"])
|
||||||
|
reset = data["reset"]
|
||||||
|
if limit!=remaining and endpoint not in ["/application/rate_limit_status"]:
|
||||||
|
print(endpoint, f"{remaining}/{limit}",
|
||||||
|
"Reset in:", datetime.datetime.fromtimestamp(reset)-datetime.datetime.now())
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Prints tweets to a thermal printer")
|
||||||
|
parser.add_argument("-p", "--printer", type=str, metavar="printer",
|
||||||
|
help="Which thermal printer to use (e.g. /dev/usb/lp1). If empty nothing will be printed.")
|
||||||
|
rt_handling = parser.add_mutually_exclusive_group()
|
||||||
|
rt_handling.add_argument("-n", "--no-retweets", action="store_true", default=None,
|
||||||
|
help="Don't show retweets (default when monitoring hashtags)")
|
||||||
|
rt_handling.add_argument("-r", "--retweets", action="store_true", default=None,
|
||||||
|
help="Show retweets (default when showing user timeline)")
|
||||||
|
parser.add_argument(metavar="keyword", nargs="*", type=str, dest="keywords",
|
||||||
|
help="What to track. You can specify hashtags or keywords here. If empty the user timeline will be monitored instead")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
config = load_config()
|
||||||
|
|
||||||
|
consumer_key = config["auth"]["consumer"]["key"]
|
||||||
|
consumer_secret = config["auth"]["consumer"]["secret"]
|
||||||
|
|
||||||
|
access_token = config["auth"]["access"]["token"]
|
||||||
|
access_secret = config["auth"]["access"]["secret"]
|
||||||
|
|
||||||
|
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
|
||||||
|
auth.set_access_token(access_token, access_secret)
|
||||||
|
|
||||||
|
api = tweepy.API(auth)
|
||||||
|
|
||||||
|
check_rate_limits(api)
|
||||||
|
|
||||||
|
show_rts = None
|
||||||
|
if len(args.keywords) == 0:
|
||||||
|
show_rts = False if args.no_retweets else True
|
||||||
|
else:
|
||||||
|
show_rts = True if args.retweets else False
|
||||||
|
|
||||||
|
stream_name = ",".join(args.keywords) if args.keywords else "Timeline"
|
||||||
|
|
||||||
|
def start_streaming(listener, api):
|
||||||
|
stream = tweepy.Stream(listener=listener, auth=api.auth)
|
||||||
|
if args.keywords:
|
||||||
|
stream.filter(track=args.keywords)
|
||||||
|
else:
|
||||||
|
stream.userstream()
|
||||||
|
|
||||||
|
try:
|
||||||
|
if args.printer:
|
||||||
|
with Printer(args.printer) as thermal:
|
||||||
|
logger.info("Streaming {} to thermal printer {}...".format(stream_name, thermal.port))
|
||||||
|
listener = StreamListener(stream_name, ignore_rt=not show_rts, printer=thermal)
|
||||||
|
start_streaming(listener, api)
|
||||||
|
else:
|
||||||
|
logger.info("Streaming {}...".format(stream_name))
|
||||||
|
listener = StreamListener(stream_name, ignore_rt=not show_rts)
|
||||||
|
start_streaming(listener, api)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
|
@ -0,0 +1,2 @@
|
||||||
|
toml==0.9.4
|
||||||
|
tweepy==3.6.0
|
Loading…
Reference in New Issue