import json import sys class ComplexEncoder(json.JSONEncoder): """Special JSON encoder that detects objects that implement the JSONSerializable interface and invokes their to_json method.""" def default(self, obj): if isinstance(obj, JSONSerializable): return obj.to_json() # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) class JSONSerializable(object): """Interface for JSON-serializable classes""" def to_json(self): raise NotImplementedError("Must implement 'to_json'") class Counter(object): """Wrapper for a count to be shared among substreams""" def __init__(self): self.count = 0 def increment(self): self.count += 1 def clear(self): self.count = 0 class Stream(JSONSerializable): """A Stream manages printing serialized objects in a fixed-number of chunks. Streams can have substreams that are keyed on a special name. Substreams share a global counter for the number of items that are printed out at once.""" def __init__(self, out, buf_size=10, counter=None, root=None): if not counter: counter = Counter() if not root: root = self self.buf_size = buf_size self.out = out self.counter = counter self.root = root self.data = {} self.substreams = {} self.first = True def get_start(self): return '[' def start(self): """Marks the start of the Stream :param should_print: Whether it should write to self.out :return: A value to be printed""" self.out.write(self.get_start()) def flush(self): """Serializes the current contents of the stream for output and recursively flushes everything. Does nothing if it's not the root stream. :param should_print: Whether it should write to self.out :return: A value to be printed""" if self.root == self: self.out.write(self.to_json()) self.clear() self.out.flush() def get_stop(self): return ']' def stop(self): """Marks the start of the Stream :param should_print: Whether it should write to self.out :return: A value to be printed""" self.flush() self.out.write(self.get_stop()) def clear(self): """Clears the shared counter and recursively clears all substreams.""" self.counter.clear() self.data = {} for substream in self.substreams.values(): substream.clear() def output(self, stream_name, obj): """Stores an object in the buffer. If the number of things stored exceeds the global buffer size, automatically flush the streams. :param stream_name: The key under which to store the stream :param obj: The object""" if stream_name not in self.data: self.data[stream_name] = [] self.data[stream_name].append(obj) self.counter.increment() if self.counter.count >= self.buf_size: self.root.flush() def new_stream(self, name): """Creates a substream with the same 'out' and 'buf_size'. Copies the references to the counter and root node to the substream, and stores a reference in the substreams dict. Also invokes the "start" method without any side-effect printing. """ s = Stream(self.out, self.buf_size) s.counter = self.counter s.root = self.root self.substreams[name] = s return s def is_empty(self): """Determine whether this substream and all substreams are empty. :return: Whether this stream is empty""" empty = self.data == {} for substream in self.substreams.values(): empty = empty and substream.is_empty() return empty def add_comma_if_not_first(self, s): """Adds a comma to the provided list 's' if it is not the first thing to be printed. Otherwise, we set the flag. :param s: The list in which to add the comma""" if self.first: self.first = False else: s.append(',') def to_json(self): """Converts the stream to JSON. Does not include the opening and closing square brackets. If printing, you should include those first. :return: JSON representation of the stream.""" s = [] # serialize streams if self.data: self.add_comma_if_not_first(s) s.append(json.dumps(self.data, cls=ComplexEncoder)) # recursively serialize substreams for key, stream in self.substreams.items(): if stream.is_empty(): continue self.add_comma_if_not_first(s) s.append('{{"{}":'.format(key)) s.append(stream.get_start()) s.append(stream.to_json()) s.append(stream.get_stop()) s.append("}") return ''.join(s) def __str__(self): return self.to_json() def __repr__(self): return str(self) def main(): s = Stream(sys.stdout, 4) s.start() for i in range(7): proc = {"pid": i + 1, "name": "init"} s.output("things", proc) q = s.new_stream('q') q.output("test", 'potato') for i in range(10): proc = {"pid": i + 1, "name": "init"} s.output("processes", proc) q.output("test", 'salad') q.output("test", 'rocks') s.flush() s.stop() if __name__ == '__main__': main()