tweet-printer/printer.py

370 lines
13 KiB
Python
Raw Normal View History

2018-03-18 15:56:49 +00:00
#!/usr/bin/env python3
"""
TODO: Pseudo Retweets (aka Commments). Tweets in which the only URL is to another Tweet.
2018-03-18 15:56:49 +00:00
TODO: Figure out if/when to show unshortened URLs in links contained in Tweets.
TODO: Figure out bitmap mode so stuff that is not Latin-1 encodable can still be printed.
2018-03-18 15:56:49 +00:00
"""
__version__ = "0.0.2"
2018-03-18 15:56:49 +00:00
import argparse
import datetime
import html
import logging
import sys
import textwrap
2018-03-18 15:56:49 +00:00
import toml
import tweepy
2018-05-11 08:40:35 +00:00
from PIL import Image, ImageDraw, ImageFont
2018-03-18 15:56:49 +00:00
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("tweet-printer")
2018-05-11 08:40:35 +00:00
printer_line_width = 52
2018-03-18 17:56:52 +00:00
2018-03-18 15:56:49 +00:00
def load_config():
fn = "config.toml"
cfg = {}
try:
with open(fn, "r") as f:
cfg = toml.load(f)
except FileNotFoundError:
logger.error(f"Can't open config file {fn}")
sys.exit(1)
except toml.TomlDecodeError as ex:
logger.error(f"Can't parse config file {fn}: {ex}")
sys.exit(1)
if "auth" not in cfg:
logger.error("Can't find auth section in configuration data.")
sys.exit(1)
authdata = cfg["auth"]
if "consumer" not in authdata:
logger.error("Can't find consumer auth section in configuration data.")
sys.exit(1)
consumerdata = authdata["consumer"]
if "key" not in consumerdata or not consumerdata["key"]:
logger.error("Can't find consumer key in configuration data.")
sys.exit(1)
if "secret" not in consumerdata or not consumerdata["secret"]:
logger.error("Can't find consumer secret in configuration data.")
sys.exit(1)
if "access" not in authdata:
logger.error("Can't find access auth section of configuration data.")
sys.exit(1)
accessdata = authdata["access"]
if "token" not in accessdata:
logger.error("Can't find access token in configuration data.")
sys.exit(1)
if "secret" not in accessdata:
logger.error("Can't find access secret in configuration data.")
sys.exit(1)
return cfg
def format_header(handle, name, dt, is_rt=None):
2018-03-18 17:56:52 +00:00
space_for_datetime = 20
space = printer_line_width-space_for_datetime
2018-03-18 15:56:49 +00:00
header = ""
if is_rt is None:
is_rt = False
if is_rt:
header += "RT by "
2018-03-18 17:56:52 +00:00
header += f"@{handle} "[:space]
space_for_name = space-len(header)-2
2018-03-18 15:56:49 +00:00
shortened_by=0
if len(name)<=space_for_name:
header+=f"({name})"
else:
shortened_name_len = space_for_name-3
if shortened_name_len>0:
shortened_by = len(name)-shortened_name_len
header+=f"({name[:shortened_name_len]}...)"
2018-03-18 17:56:52 +00:00
header = header.ljust(space)
2018-03-18 15:56:49 +00:00
header += dt.strftime(" %Y-%m-%d %H:%M:%S")
return header
class Printer:
def __init__(self, port, **kwargs):
2018-03-18 15:56:49 +00:00
self.port = port
for arg, default in [("flipped", False)]:
setattr(self, arg, kwargs.get(arg, default))
2018-03-18 15:56:49 +00:00
def __enter__(self):
self.printer = open(self.port, "wb")
self.printer.write(bytes([0x1b, 0x40])) # Initialize printer
self.printer.write(bytes([0x1b, 0x74, 0x06])) # Set character code table to CP1252/Latin-1
if self.flipped:
self.printer.write(bytes([0x1b, 0x7b, 0x01])) # Set printer to upside-down mode
2018-03-18 15:56:49 +00:00
return self
def __exit__(self, exc_type, exc_value, traceback):
self.printer.close()
def write(self, data, newline=True, cut=False):
if type(data) is str:
data = data.encode("latin-1")
elif type(data) is not bytes:
logger.error("Unsupported type for Printer.write: "+str(type(data)))
return
self.printer.write(data)
if newline:
self.printer.write(bytes([13, 10])) # CR, LF
2018-03-18 15:56:49 +00:00
if cut:
self.printer.write(bytes([0x1d, 0x56, # Feed and Cut
0x42, # Feed to cut position with extra length and cut
10 # extra*Y Unit exta space
2018-03-18 15:56:49 +00:00
]))
2018-05-11 08:40:35 +00:00
def write_image(self, img):
def size_for_printer(w,h):
return [math.ceil(w/8), (w//8)//256, h%256, h//256]
w, h = img.size
if w!=576:
logger.error("Currently images with width other than 576 can't be printed")
return
data = [0x1d, 0x76, 0x30, 0x00] + size_for_printer(*img.size)
for y in range(img.height):
current_byte = 0
current_bit = 0
for x in range(img.width):
l = img.getpixel((x, y))
to_add = 1<<(7-current_bit)
current_byte += to_add if l<127 else 0
current_bit += 1
if current_bit>7:
data.append(current_byte)
current_bit = 0
current_byte = 0
if current_bit != 0:
data.append(current_byte)
self.printer.write(bytes(data))
2018-03-18 15:56:49 +00:00
def flush(self):
self.printer.flush()
2018-05-11 08:40:35 +00:00
class ImageRenderer:
def __init__(self, font_file, font_size=None):
self.font = ImageFont.truetype(font_file, font_size or 18)
self.line_height = self.font.getsize("x")[1]
self.line_spacing = 4
def render_header(self, handle, name, dt, is_rt=None):
h = self.font.getsize("0@"+handle+name)[1] # Figure out, how high this line will be
w = 576
if is_rt is None:
is_rt = False
handle_text = f"@{handle} "
if is_rt:
handle_text = "RT by "+handle_text
name_text = f"({name})"
dt_text = dt.strftime(" %Y-%m-%d %H:%M:%S")
space_for_dt = self.font.getsize(dt_text)
space_for_handle = self.font.getsize(handle_text)
space_for_name = self.font.getsize(name_text)
print(space_for_handle, space_for_name, space_for_dt)
w_left_for_name = w-space_for_handle[0]-space_for_dt[0]
while len(name_text) and space_for_name[0]>w_left_for_name:
name_text = name_text[-2]+"\u2026)"
space_for_name = self.font.getsize(name_text)
print(handle_text, name_text, dt_text, sep="")
img = Image.new("1", (576, h), 0)
draw = ImageDraw.Draw(img)
draw.text((0,0), handle_text, font=self.font, fill=1)
draw.text((space_for_handle[0], 0), name_text, font=self.font, fill=1)
draw.text((w-space_for_dt[0], 0), dt_text, font=self.font, fill=1)
return img
def render(self, lines):
spacing = self.line_height + self.line_spacing
img = Image.new("1", (576, len(lines)*spacing), 0)
draw = ImageDraw.Draw(img)
for i,line in enumerate(lines):
draw.text((0, i*spacing), line, font=self.font, fill=1)
return img
2018-03-18 15:56:49 +00:00
class StreamListener(tweepy.StreamListener):
def __init__(self, title, ignore_rt=False, printer=None):
super().__init__()
self.title = title
self.seen_tweets = set()
self.ignore_rt = ignore_rt
self.printer = printer
@classmethod
2018-05-11 08:40:35 +00:00
def split(cls, text, extra_lines=[]):
lines = extra_lines
for line in text.splitlines(True):
stripped = line.strip()
if stripped:
lines += textwrap.wrap(stripped, printer_line_width)
else:
lines.append(stripped)
2018-05-11 08:40:35 +00:00
return lines
@classmethod
def split_and_encode_text(cls, text, extra_lines=[]):
return [line.encode("latin-1") for line in split(text, extra_lines)]
def print_tweet(self, text, headers):
try:
encoded_lines = self.split_and_encode_text(text, headers)
if self.printer.flipped:
encoded_lines.reverse()
for line in encoded_lines:
self.printer.write(line)
self.printer.write("", False, True)
self.printer.flush()
except UnicodeEncodeError:
print("Can't print. Encoding issue.")
2018-03-18 15:56:49 +00:00
def on_status(self, status):
if status.id in self.seen_tweets:
print("********", status.id, "Already seen this tweet before")
return True
#with open("status.log", "at+") as f:
# f.write(repr(status))
# f.write("\n\n")
rt = getattr(status, "retweeted_status", None)
if not rt:
print("********", status.id)
self.seen_tweets.add(status.id)
text = status.extended_tweet["full_text"] if status.truncated else status.text
text = html.unescape(text)
header = format_header(status.user.screen_name, status.user.name, status.created_at)
print(header)
print(text)
2018-05-11 08:40:35 +00:00
r = ImageRenderer("fonts/DejaVuSansMono.ttf")
r.render([header]+self.split(text)).show()
return True
2018-03-18 15:56:49 +00:00
if self.printer:
self.print_tweet(text, [header])
2018-03-18 15:56:49 +00:00
else:
if self.ignore_rt:
#print("******** Ignoring Retweet")
return True
if rt.id in self.seen_tweets:
print("********", rt.id, "Already seen this retweeted before")
return True
print("********", rt.id)
self.seen_tweets.add(rt.id)
rt_text = rt.extended_tweet["full_text"] if rt.truncated else rt.text
rt_text = html.unescape(rt_text)
header = format_header(status.user.screen_name, status.user.name, status.created_at, is_rt=True)
rt_header = format_header(rt.user.screen_name, rt.user.name, rt.created_at)
print(header)
print(rt_header)
print(rt_text)
if self.printer:
self.print_tweet(rt_text, [header, rt_header])
2018-03-18 15:56:49 +00:00
return True
def on_error(self, status_code):
if status_code == 420:
return False
def check_rate_limits(api):
rate_limits = api.rate_limit_status()
for category, endpoints in rate_limits["resources"].items():
for endpoint, data in endpoints.items():
limit = int(data["limit"])
remaining = int(data["remaining"])
reset = data["reset"]
if limit!=remaining and endpoint not in ["/application/rate_limit_status"]:
print(endpoint, f"{remaining}/{limit}",
"Reset in:", datetime.datetime.fromtimestamp(reset)-datetime.datetime.now())
def main():
parser = argparse.ArgumentParser(description="Prints tweets to a thermal printer")
parser.add_argument("-p", "--printer", type=str, metavar="printer",
help="Which thermal printer to use (e.g. /dev/usb/lp1). If empty nothing will be printed.")
rt_handling = parser.add_mutually_exclusive_group()
rt_handling.add_argument("-n", "--no-retweets", action="store_true", default=None,
help="Don't show retweets (default when monitoring hashtags)")
rt_handling.add_argument("-r", "--retweets", action="store_true", default=None,
help="Show retweets (default when showing user timeline)")
parser.add_argument(metavar="keyword", nargs="*", type=str, dest="keywords",
help="What to track. You can specify hashtags or keywords here. If empty the user timeline will be monitored instead")
args = parser.parse_args()
config = load_config()
printer_config = config.get("printer", {})
flip_printer = printer_config.get("flipped", False)
2018-03-18 15:56:49 +00:00
consumer_key = config["auth"]["consumer"]["key"]
consumer_secret = config["auth"]["consumer"]["secret"]
access_token = config["auth"]["access"]["token"]
access_secret = config["auth"]["access"]["secret"]
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
api = tweepy.API(auth)
check_rate_limits(api)
stream_name = ",".join(args.keywords) if args.keywords else "Timeline"
2018-03-18 15:56:49 +00:00
show_rts = None
if len(args.keywords) == 0:
show_rts = False if args.no_retweets else True
else:
show_rts = True if args.retweets else False
stream_name += " (including Retweets)" if show_rts else " (without Retweets)"
2018-03-18 15:56:49 +00:00
def start_streaming(listener, api):
stream = tweepy.Stream(listener=listener, auth=api.auth)
if args.keywords:
stream.filter(track=args.keywords)
else:
stream.userstream()
try:
if args.printer:
with Printer(args.printer, flipped=flip_printer) as thermal:
2018-03-18 15:56:49 +00:00
logger.info("Streaming {} to thermal printer {}...".format(stream_name, thermal.port))
listener = StreamListener(stream_name, ignore_rt=not show_rts, printer=thermal)
start_streaming(listener, api)
else:
logger.info("Streaming {}...".format(stream_name))
listener = StreamListener(stream_name, ignore_rt=not show_rts)
start_streaming(listener, api)
except KeyboardInterrupt:
pass
2018-05-11 08:40:35 +00:00
def test():
r = ImageRenderer("fonts/DejaVuSans.ttf", font_size=20)
r.render_header("trilader", "Daniel 🙏", datetime.datetime.now()).show()
2018-03-18 15:56:49 +00:00
if __name__ == "__main__":
2018-05-11 08:40:35 +00:00
#main()
test()