This repository has been archived on 2015-04-30. You can view files and clone it, but cannot push or open issues or pull requests.
stream/python/jsh/streams.py
2015-04-29 17:24:11 -04:00

172 lines
5.1 KiB
Python

import json
import sys
class ComplexEncoder(json.JSONEncoder):
"""Special JSON encoder that detects objects that implement the
JSONSerializable interface and invokes their to_json method."""
def default(self, obj):
if isinstance(obj, JSONSerializable):
return obj.to_json()
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
class JSONSerializable(object):
"""Interface for JSON-serializable classes"""
def to_json(self):
raise NotImplementedError("Must implement 'to_json'")
class Counter(object):
"""Wrapper for a count to be shared among substreams"""
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
def clear(self):
self.count = 0
class Stream(JSONSerializable):
"""A Stream manages printing serialized objects in a fixed-number
of chunks. Streams can have substreams that are keyed on a special
name. Substreams share a global counter for the number of items
that are printed out at once."""
def __init__(self, out, buf_size=10, counter=None, root=None):
if not counter:
counter = Counter()
if not root:
root = self
self.buf_size = buf_size
self.out = out
self.counter = counter
self.root = root
self.data = {}
self.substreams = {}
self.first = True
def get_start(self):
return '['
def start(self):
"""Marks the start of the Stream
:param should_print: Whether it should write to self.out
:return: A value to be printed"""
self.out.write(self.get_start())
def flush(self):
"""Serializes the current contents of the stream for output and
recursively flushes everything.
Does nothing if it's not the root stream.
:param should_print: Whether it should write to self.out
:return: A value to be printed"""
if self.root == self:
self.out.write(self.to_json())
self.clear()
self.out.flush()
def get_stop(self):
return ']'
def stop(self):
"""Marks the start of the Stream
:param should_print: Whether it should write to self.out
:return: A value to be printed"""
self.flush()
self.out.write(self.get_stop())
def clear(self):
"""Clears the shared counter and recursively clears all
substreams."""
self.counter.clear()
self.data = {}
for substream in self.substreams.values():
substream.clear()
def output(self, stream_name, obj):
"""Stores an object in the buffer. If the number of things
stored exceeds the global buffer size, automatically flush
the streams.
:param stream_name: The key under which to store the stream
:param obj: The object"""
if stream_name not in self.data:
self.data[stream_name] = []
self.data[stream_name].append(obj)
self.counter.increment()
if self.counter.count >= self.buf_size:
self.root.flush()
def new_stream(self, name):
"""Creates a substream with the same 'out' and 'buf_size'.
Copies the references to the counter and root node to the
substream, and stores a reference in the substreams dict. Also
invokes the "start" method without any side-effect printing.
"""
s = Stream(self.out, self.buf_size)
s.counter = self.counter
s.root = self.root
self.substreams[name] = s
return s
def is_empty(self):
"""Determine whether this substream and all substreams are
empty.
:return: Whether this stream is empty"""
empty = self.data == {}
for substream in self.substreams.values():
empty = empty and substream.is_empty()
return empty
def add_comma_if_not_first(self, s):
"""Adds a comma to the provided list 's' if it is not the first
thing to be printed. Otherwise, we set the flag.
:param s: The list in which to add the comma"""
if self.first:
self.first = False
else:
s.append(',')
def to_json(self):
"""Converts the stream to JSON. Does not include the opening
and closing square brackets. If printing, you should include
those first.
:return: JSON representation of the stream."""
s = []
# serialize streams
if self.data:
self.add_comma_if_not_first(s)
s.append(json.dumps(self.data, cls=ComplexEncoder))
# recursively serialize substreams
for key, stream in self.substreams.items():
if stream.is_empty():
continue
self.add_comma_if_not_first(s)
s.append('{{"{}":'.format(key))
s.append(stream.get_start())
s.append(stream.to_json())
s.append(stream.get_stop())
s.append("}")
return ''.join(s)
def __str__(self):
return self.to_json()
def __repr__(self):
return str(self)