pypond package

Submodules

pypond.bases module

Common base classes and mixins.

class pypond.bases.Observable

Bases: pypond.bases.PypondBase

Base class for objects in the processing chain which need other object to listen to them. It provides a basic interface to define the relationships and to emit events to the interested observers.

add_observer(observer)

add an observer if it does not already exist.

emit(event)

add event to observers.

flush()

flush observers.

has_observers()

does the object have observers?

class pypond.bases.PypondBase

Bases: object

Universal base class. Used to provide common functionality (logging, etc) to all the other classes.

pypond.bases.setup_log(log_path=None)

Usage: _log(‘main.start’, ‘happy simple log event’) _log(‘launch’, ‘more={0}, complex={1} log=event’.format(100, 200))

pypond.collection module

Implementation of Pond Collection class.

class pypond.collection.Collection(instance_or_list=None, copy_events=True)

Bases: pypond.io.input.Bounded

A collection is a list of Events. You can construct one out of either another collection, or a list of Events. You can addEvent() to a collection and a new collection will be returned.

Basic operations on the list of events are also possible. You can iterate over the collection with a for..of loop, get the size() of the collection and access a specific element with at().

Initialize from copy, lists, etc.

instance_or_list arg can be:

  • a Collection object (copy ctor)
  • a python list
  • a pyrsistent.pvector

The list and pvector will contain Events.

Parameters:
  • instance_or_list (list, Collection, pyrsistent.pvector) – A collection object to copy or a list of Event objects
  • copy_events (bool, optional) – Copy event list when using copy constructor, otherwise the new object has an emtpy event list.
__str__()

call to_string()

to_string() is already being tested so skip coverage.

Returns:String representation of the object.
Return type:str
add_event(event)

Add an event to the payload and return a new Collection object.

Parameters:event (Event) – Event object to add to collection.
Returns:New collection with the event added to it.
Return type:Collection
aggregate(func, field_path=None)

Aggregates the events down using a user defined function to do the reduction. Only a single column can be aggregated on so this takes a field_path, NOT a field_spec.

This is essentially a wrapper around map/reduce, constraining it to a single column and returning the value, not the dict from map().

Parameters:
  • func (function) – Function to pass to map reduce to aggregate.
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

Returns:

Returns the aggregated value, so it depends on what kind of data are being handled/aggregation being done.

Return type:

various

at(pos)

Returns an item in the collection by its index position.

Creates a new object via copy ctor.

Parameters:pos (int) – Index of the event to be retrieved.
Returns:A new Event object of the event at index pos
Return type:Event
Raises:CollectionException – Raised if there is an index error.
at_first()

Retrieve the first item in this collection.

Returns:An event instance.
Return type:Event
at_key(searchkey)

Returns a list of events in the Collection which have the exact key (time, timerange or index) as the key specified by ‘at’. Note that this is an O(n) search for the time specified, since collections are an unordered bag of events.

Parameters:key (datetime, str, TimeRange) – The key of the event
Returns:List of all events at that key.
Return type:list
at_last()

Return the last event item in this collection.

Returns:An event instance.
Return type:Event
at_time(time)

Return an item by time. Primarily a utility method that sits in front of bisect() and fetches using at().

If you have events at 12:00 and 12:02 and you make the query at 12:01, the one at 12:00 will be returned. Otherwise it will return the exact match.

Parameters:time (datetime.datetime) – Datetime object >= to the event to be returned. Must be an aware UTC datetime object.
Returns:Returns a new Event instance via at()
Return type:Event
avg(field_path=None, filter_func=None)

Get avg

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Average value.

Return type:

int or float

bisect(dtime, b=0)

Finds the index that is just less than the time t supplied. In other words every event at the returned index or less has a time before the supplied t, and every sample after the index has a time later than the supplied t.

Optionally supply a begin index to start searching from.

  • dtime - python datetime object to bisect collection with

    will be made into an aware datetime in UTC.

  • b - position to start

Returns index that is the greatest but still below t - see docstring for at_time()

Parameters:
  • dtime (datetime.datetime) – Datetime object >= to the event to be returned. Must be an aware UTC datetime object.
  • b (int, optional) – Array index to start searching from
Returns:

The index of the searched-for event

Return type:

int

Raises:

CollectionException – Raised if given a naive or non-UTC dtime

clean(field_path=None)

Returns a new Collection by testing the fieldSpec values for being valid (not NaN, null or undefined). The resulting Collection will be clean for that fieldSpec.

Parameters:field_path (str, list, tuple, None, optional) –

Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

If field_path is None, then [‘value’] will be the default.

Returns:New collection containing only “clean” events.
Return type:Collection
collapse(field_spec_list, name, reducer, append=True)

Takes a fieldSpecList (list of column names) and collapses them to a new column which is the reduction of the matched columns in the fieldSpecList.

Parameters:
  • field_spec_list (list) – List of columns to collapse. If you need to retrieve deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’].
  • name (str) – Name of new column containing collapsed data.
  • reducer (function) – Function to pass to reducer.
  • append (bool, optional) – Append collapsed column to existing data or make new events with only that column.
Returns:

New collection containing the collapsed data.

Return type:

Collection

count()

Get count - calls size()

Returns:Num events in the collection.
Return type:int
dedup()

Remove duplicates from the Collection. If duplicates exist in the collection with the same key but with different values, the later event values will be used.

Returns:A new collection w/out duplicates.
Return type:Collection
static equal(coll1, coll2)

Test to see if instances are the same instance.

Parameters:
Returns:

True if same instance.

Return type:

bool

event_list()

Returns the raw Immutable event list.

Returns:Raw immutable event list.
Return type:pyrsistent.pvector
event_list_as_list()

return a python list of the event list.

Returns:Thawed version of internal immutable data structure.
Return type:list
event_list_as_map()

Return the events in the collection as a dict of lists where the key is the timestamp, index or timerange and the value is an array of events with that key.

Returns:Description
Return type:TYPE
events()

Generator to allow for..of loops over series.events()

for i in series.events():
    do_stuff(i)
Returns:An iterator to loop over the events.
Return type:iterator
filter(func)

Filter the collection’s event list with the supplied function. The function will be passed each of the Event objects and return a boolean value. If True, then it will be included in the filter.

def is_even(event):
    return bool(event.get('some_value') % 2 == 0)

Would produce a new collection where ‘some_value’ is only even numbers.

Parameters:func (function) – Function to filter with.
Returns:New collection containing filtered events.
Return type:Collection
first(field_path=None, filter_func=None)

Get first value in the collection for the fspec

Parameters:
  • field_spec (str, list, tuple, None) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Type varies depending on underlying data

Return type:

depends on data

is_chronological()

Checks that the events in this collection are in chronological order.

Returns:True if events are in chronologcal order.
Return type:bool
last(field_path=None, filter_func=None)

Get last value in the collection for the fspec

Parameters:
  • field_spec (str, list, tuple, None) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Type varies depending on underlying data

Return type:

depends on data

map(func)

Map function. Apply function to the collection events and return a new Collection from the resulting events. Function must creat a new Event* instance.

def in_only(event):
    # make new events wtin only data value "in".
    return Event(event.timestamp(), {'in': event.get('in')})
Parameters:func (function) – Mapper function
Returns:New collection containing mapped events.
Return type:Collection
max(field_path=None, filter_func=None)

Get max

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Maximum value.

Return type:

int or float

mean(field_path=None, filter_func=None)

Get mean

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Mean value (grrr!).

Return type:

int or float

median(field_path=None, filter_func=None)

Get median

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Median value.

Return type:

int or float

min(field_path=None, filter_func=None)

Get min

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Minimum value.

Return type:

int or float

percentile(perc, field_path, method='linear', filter_func=None)

Gets percentile perc within the Collection. This works the same way as numpy.

Parameters:
  • perc (int) – The percentile (should be between 0 and 100)
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • method (str, optional) –

    Specifies the interpolation method to use when the desired percentile lies between two data points. Options are:

    linear: i + (j - i) * fraction, where fraction is the fractional part of the index surrounded by i and j.

    lower: i

    higher: j

    nearest: i or j whichever is nearest

    midpoint: (i + j) / 2

Returns:

The percentile.

Return type:

int or float

quantile(num, field_path=None, method='linear')

Gets num quantiles within the Collection

Parameters:
  • num (Number of quantiles to divide the Collection into.) – Description
  • field_path (None, optional) – The field to return as the quantile. If not set, defaults to ‘value.’
  • method (str, optional) –

    Specifies the interpolation method to use when the desired percentile lies between two data points. Options are:

    linear: i + (j - i) * fraction, where fraction is the fractional part of the index surrounded by i and j.

    lower: i

    higher: j

    nearest: i or j whichever is nearest

    midpoint: (i + j) / 2

Returns:

An array of quantiles

Return type:

list

range()

From the range of times, or Indexes within the TimeSeries, return the extents of the Collection/TimeSeries as a TimeRange.

Returns:Extents as time range.
Return type:TimeRange
static same(coll1, coll2)

Test to see if the collections have the same values.

Parameters:
Returns:

True if same values.

Return type:

bool

set_events(events)

Create a new Collection from this one and set the internal list of events

Parameters:events (list or pyrsistent.pvector) – A list of events
Returns:Returns a new collection with the event list set to the everts arg
Return type:Collection
Raises:CollectionException – Raised if wrong arg type.
size()

Number of items in collection.

Returns:Number of items in collection
Return type:int
size_valid(field_path=None)

Returns the number of valid items in this collection.

Uses the fieldSpec to look up values in all events. It then counts the number that are considered valid, i.e. are not NaN, undefined or null.

Parameters:field_path (str, list, tuple, None, optional) –

Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

If field_path is None, then [‘value’] will be the default.

Returns:Number of valid <field_path> values in all of the Events.
Return type:int
slice(begin, end)

Perform a slice of events within the Collection, returns a new Collection representing a portion of this TimeSeries from begin up to but not including end. Uses typical python [slice:syntax].

Parameters:
  • begin (int) – Slice begin.
  • end (int) – Slice end.
Returns:

New collection with sliced payload.

Return type:

Collection

sort(field_path)

Sorts the Collection using the value referenced by field_path.

Parameters:field_path (str, list, tuple, None, optional) –

Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

If field_path is None, then [‘value’] will be the default.

Returns:New collection of sorted values.
Return type:Collection
sort_by_time()

Return a new instance of this collection after making sure that all of the events are sorted by timestamp.

Returns:A copy of this collection with the events chronologically sorted.
Return type:Collection
stdev(field_path=None, filter_func=None)

Get std dev

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Standard deviation.

Return type:

int or float

sum(field_path=None, filter_func=None)

Get sum

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Summed value.

Return type:

int or float

to_json()

Returns the collection as json object.

This is actually like json.loads(s) - produces the actual vanilla data structure.

Returns:A thawed list of Event objects.
Return type:list
to_string()

Retruns the collection as a string, useful for serialization.

In JS land, this is synonymous with __str__ or __unicode__

Use custom object encoder because this is a list of Event* objects.

Returns:String representation of this object.
Return type:str
type()

Event object type.

The class of the type of events in this collection.

Returns:The class (not instance) of the type of events.
Return type:Event

pypond.event module

Implementation of the Pond Event classes.

http://software.es.net/pond/#/events

class pypond.event.Event(instance_or_time, data=None)

Bases: pypond.event.EventBase

A generic event. This represents a data object at a single timestamp, supplied at initialization.

The timestamp may be a python date object, datetime object, or ms since UNIX epoch. It is stored internally as a datetime object.

The data may be any type.

Asking the Event object for the timestamp returns an integer copy of the number of ms since the UNIX epoch. There’s no method on the Event object to mutate the Event timestamp after it is created.

The creation of an Event is done by combining two parts: the timestamp (or time range, or Index...) and the data.

To construct you specify the timestamp as either:

  • a python date or datetime object
  • millisecond timestamp: the number of ms since the UNIX epoch

To specify the data you can supply either:

  • a python dict
  • a pyrsistent.PMap created with pyrsistent.freeze(), or
  • a simple type such as an integer. In the case of the simple type this is a shorthand for supplying {“value”: v}.

If supplying a PMap for either of the args (rather than supplying a python dict and letting the Event class handle it which is preferred), create it with freeze() and not pmap(). This is because any nested dicts must similarly be made immutable and pmap() will only freeze the “outer” dict.

Parameters:
  • instance_or_time (Event, pyrsistent.PMap, int, datetime.datetime) – An event for copy constructor, a fully formed and formatted immutable data payload, or an int (epoch ms) or a datetime.datetime object to create a timestamp from.
  • data (None, optional) – Could be dict/PMap/int/float/str to use for data payload.
static avg(events, field_spec=None, filter_func=None)

combine() called with a averaging function as a reducer.

Parameters:
  • events (list) – A list of Event objects
  • field_spec (list, str, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

A list containing the averaged events.

Return type:

list

begin()

The begin time of this Event, which will be just the timestamp.

Returns:Datetime object
Return type:datetime.datetime
collapse(field_spec_list, name, reducer, append=False)

Collapses this event’s columns, represented by the fieldSpecList into a single column. The collapsing itself is done with the reducer function. Optionally the collapsed column could be appended to the existing columns, or replace them (the default).

Parameters:
  • field_spec_list (list) – List of columns to collapse. If you need to retrieve deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’].
  • name (str) – Name of new column with collapsed data.
  • reducer (function) – Function to pass to reducer.
  • append (bool, optional) – Set True to add new column to existing data dict, False to create a new Event with just the collapsed data.
Returns:

New event object.

Return type:

Event

static combine(events, field_spec, reducer)

Combines multiple events together into a new array of events, one for each time/index/timerange of the source events. The list of events may be specified as an array or Immutable.List. Combining acts on the fields specified in the fieldSpec and uses the reducer function to take the multiple values and reducer them down to one.

The return result will be an of the same form as the input. If you pass in an array of events, you will get an array of events back. If you pass an Immutable.List of events then you will get an Immutable.List of events back.

This is the general version of Event.sum() and Event.avg(). If those common use cases are what you want, just use those functions. If you want to specify your own reducer you can use this function.

See also: TimeSeries.timeSeriesListSum()

Parameters:
  • events (list) – List of Event objects
  • field_spec (string, list) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
  • reducer (function) – Reducer function to apply to column data
Returns:

List of new events

Return type:

list

Raises:

EventException – Raised if illegal input is received.

end()

The end time of this Event, which will be just the timestamp.

Returns:Datetime object
Return type:datetime.datetime
static is_duplicate(event1, event2, ignore_values=True)

Returns if the two supplied events are duplicates of each other. By default, duplicated means that the timestamps are the same. This is the case with incoming events where the second event is either known to be the same (but duplicate) of the first, or supersedes the first. You can also pass in false for ignoreValues and get a full compare.

Parameters:
  • event1 (Event, IndexedEvent or TimeSeriesEvent) – One of the event variants.
  • event2 (Event, IndexedEvent or TimeSeriesEvent) – One of the event variants.
  • ignore_values (bool, optional) – If set to True, the values of the events will be compared as well. The default means only the type and key will be compared.
Returns:

Description

Return type:

TYPE

static is_valid_value(event, field_path=None)

The same as Event.value() only it will return false if the value is either undefined, NaN or Null.

Parameters:
  • event (Event) – An event.
  • field_path (str, list, tuple, None, optional) –

    Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

Returns:

Return false if undefined, NaN, or None.

Return type:

bool

key()

Return timestamp as ms since epoch

Returns:ms since epoch.
Return type:int
static map(events, field_spec=None)

Maps a list of events according to the selection specification in. The spec may be a single field name, a list of field names, or a function that takes an event and returns a key/value pair.

Example 1

        in   out
 3am    1    2
 4am    3    4

result ->  {in: [1, 3], out: [2, 4]}
Parameters:
  • events (list) – A list of events
  • field_spec (str, list, func or None, optional) –

    Column or columns to map. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, then all columns will be mapped.

    If field_spec is a function, the function should return a dict. The keys will be come the “column names” that will be used in the dict that is returned.

Returns:

A dict of mapped columns/values.

Return type:

dict

static map_reduce(events, field_spec, reducer)

map and reduce

Parameters:
  • events (list) – A list of events
  • field_spec (str, list, func or None, optional) – Column or columns to map. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, then all columns will be mapped.
  • reducer (function) – The reducer function.
Returns:

A dict as returned by reduce()

Return type:

dict

static merge(events)

Merges multiple events together into a new array of events, one for each time/index/timerange of the source events. Merging is done on the data of each event. Values from later events in the list overwrite early values if fields conflict.

Common use cases:

  • append events of different timestamps
  • merge in events with one field to events with another
  • merge in events that supersede the previous events
Parameters:events (list) – A list of a homogenous kind of event.
Returns:A list of the merged events.
Return type:list
Raises:EventException – Raised if event list is not homogenous.
static reduce(mapped, reducer)

Takes a list of events and a reducer function and returns a new Event with the result, for each column. The reducer is of the form

function sum(valueList) {
    return calcValue;
}
Parameters:
  • mapped (dict) – Dict as produced by map()
  • reducer (function) – The reducer function.
Returns:

A dict of reduced values.

Return type:

dict

static same(event1, event2)

Different name for is() which is an invalid method name. Different than __eq__ - see Object.is() JS documentation.

Check if the two objects are the same.

Parameters:
  • event1 (Event) – An event.
  • event2 (Event) – Another event.
Returns:

Returns True if the event payloads is the same.

Return type:

bool

static selector(event, field_spec=None)

Function to select specific fields of an event using a fieldSpec and return a new event with just those fields.

The fieldSpec currently can be:

  • A single field name
  • An list of field names

The function returns a new event.

Parameters:
  • event (Event) – Event to pull from.
  • field_spec (str, list, tuple, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, the default column ‘value’ will be used.
Returns:

A new event object.

Return type:

Event

set_data(data)

Sets the data portion of the event and returns a new Event.

Parameters:data (dict) – New data payload for this event object.
Returns:A new event object.
Return type:Event
static sum(events, field_spec=None, filter_func=None)

combine() called with a summing function as a reducer. All of the events need to have the same timestamp.

Parameters:
  • events (list) – A list of Event objects
  • field_spec (list, str, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

A list containing the summed events.

Return type:

list

Raises:

EventException – Raised on mismatching timestamps.

timestamp()

The timestamp of this data

Returns:Datetime object
Return type:datetime.datetime
timestamp_as_local_string()

The timestamp of this data, in Local time, as a formatted string.

Returns:Formatted data string.
Return type:str
timestamp_as_utc_string()

The timestamp of this data, in UTC time, as a formatted string.

Returns:Formatted data string.
Return type:str
to_json()

Returns the Event as a JSON object, essentially

{time: ms_since_epoch, data: {key: value, ...}}

This is actually like json.loads(s) - produces the actual data structure from the object internal data.

Returns:time/data keys
Return type:dict
to_point(cols=None)

Returns a flat array starting with the timestamp, followed by the values. Can be given an optional list of columns so the returned list will have the values in order. Primarily for the TimeSeries wire format.

Parameters:cols (list, optional) – List of data columns to order the data points in so the TimeSeries wire format lines up correctly. If not specified, the points will be whatever order that dict.values() decides to return it in.
Returns:Epoch ms followed by points.
Return type:list
type()

Return type of the event object

Returns:Return the class of thise event type.
Return type:class
class pypond.event.EventBase(underscore_d)

Bases: pypond.bases.PypondBase

Common code for the event classes.

Parameters:underscore_d (pyrsistent.pmap) – Immutable dict-like object containing the payload for the events.
__eq__(other)

equality operator. need this to be able to check if the event_list in a collection is the same as another.

Parameters:other (Event) – Event object for == comparison.
Returns:True if other event has same payload.
Return type:bool
__str__()

call to_string()

begin()

abstract, override in subclass

Raises:NotImplementedError – Needs to be implemented in subclasses.
data()

Direct access to the event data. The result will be an pyrsistent.pmap.

Returns:The immutable data payload.
Return type:pyrsistent.pmap
static data_from_arg(arg)

extract data from a constructor arg and make immutable.

Parameters:arg (dict, pmap, int, float, str) – Data payloas as passed to one of the constructors. If dict or pmap, that is used as the data payload, if other value, then presumed to be a simple payload of {‘value’: arg}.
Returns:Immutable dict-like object
Return type:pyrsistent.pmap
Raises:EventException – Raised on bad arg input.
end()

abstract, override in subclass

Raises:NotImplementedError – Needs to be implemented in subclasses.
get(field_path=None)

Get specific data out of the Event. The data will be converted to a js object. You can use a fieldSpec to address deep data. A fieldSpec could be “a.b” or it could be [‘a’, ‘b’]. Favor the list version please.

The field spec can have an arbitrary number of “parts.”

Parameters:field_path (str, list, tuple, None, optional) –

Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

If field_path is None, then [‘value’] will be the default.

Returns:Type depends on underyling data
Return type:various
static index_from_args(instance_or_index, utc=True)

create Index from a constructor arg.

Parameters:
  • instance_or_index (Index or str) – Index value as passed to a constructor
  • utc (bool, optional) – Use utc time internally, please don’t not do this.
Returns:

New Index object from args.

Return type:

Index

Raises:

EventException – Raised on invalid arg.

stringify()

Produce a json string of the internal data.

Returns:String representation of this object’s internal data.
Return type:str
static timerange_from_arg(arg)

create TimeRange from a constructor arg.

Parameters:arg (list, tuple, pvector or TimeRange) – Time value as passed to one of the constructors.
Returns:New TimeRange instance from args
Return type:TimeRange
Raises:EventException – Raised on invalid arg.
timestamp()

abstract, override in subclass

Raises:NotImplementedError – Needs to be implemented in subclasses.
static timestamp_from_arg(arg)

extract timestamp from a constructor arg.

Parameters:arg (int or datetime.datetime) – Time value as passed to one of the constructors
Returns:Datetime object that has been sanitized
Return type:datetime.datetime
Raises:EventException – Does not accept unaware datetime objects.
to_json()

abstract, override in subclasses.

Raises:NotImplementedError – Needs to be implemented in subclasses.
to_string()

Retruns the Event as a string, useful for serialization. It’s a JSON string of the whole object.

In JS land, this is synonymous with __str__ or __unicode__

Returns:String representation of this object.
Return type:str
ts

A property to expose the datetime.datetime value returned by the timestamp() method. This is so we can support sorting of a list of events via the following method:

ordered = sorted(self._event_list, key=lambda x: x.ts)
Returns:Returns the value returned by timestamp()
Return type:datetime.datetime
value(field_path=None)

Alias for get()

Parameters:field_path (str, list, tuple, None) –

Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

If field_path is None, then [‘value’] will be the default.

Returns:Type depends on underlying data.
Return type:various

pypond.exceptions module

Custom exception and warning classes.

exception pypond.exceptions.CollectionException(value)

Bases: exceptions.Exception

Custom Collection exception

exception pypond.exceptions.CollectionWarning

Bases: exceptions.Warning

Custom Collection warning

exception pypond.exceptions.EventException(value)

Bases: exceptions.Exception

Custom Event exception

exception pypond.exceptions.EventWarning

Bases: exceptions.Warning

Custom Event warning

exception pypond.exceptions.FilterException(value)

Bases: exceptions.Exception

Custom Filter exception

exception pypond.exceptions.FilterWarning

Bases: exceptions.Warning

Custom Filter warning

exception pypond.exceptions.FunctionException(value)

Bases: exceptions.Exception

Custom Function exception

exception pypond.exceptions.FunctionWarning

Bases: exceptions.Warning

Custom Function warning

exception pypond.exceptions.IndexException(value)

Bases: exceptions.Exception

Custom Index exception

exception pypond.exceptions.IndexWarning

Bases: exceptions.Warning

Custom Index warning

exception pypond.exceptions.PipelineException(value)

Bases: exceptions.Exception

Custom Pipeline exception

exception pypond.exceptions.PipelineIOException(value)

Bases: exceptions.Exception

Custom PipelineIO exception

exception pypond.exceptions.PipelineIOWarning

Bases: exceptions.Warning

Custom PipelineIO warning

exception pypond.exceptions.PipelineWarning

Bases: exceptions.Warning

Custom Pipeline warning

exception pypond.exceptions.ProcessorException(value)

Bases: exceptions.Exception

Custom Processor exception

exception pypond.exceptions.ProcessorWarning

Bases: exceptions.Warning

Custom Processor warning

exception pypond.exceptions.TimeRangeException(value)

Bases: exceptions.Exception

Custom TimeRange exception

exception pypond.exceptions.TimeRangeWarning

Bases: exceptions.Warning

Custom TimeRange warning

exception pypond.exceptions.TimeSeriesException(value)

Bases: exceptions.Exception

Custom TimeSeries exception

exception pypond.exceptions.TimeSeriesWarning

Bases: exceptions.Warning

Custom TimeSeries warning

exception pypond.exceptions.UtilityException(value)

Bases: exceptions.Exception

Custom Utility exception

exception pypond.exceptions.UtilityWarning

Bases: exceptions.Warning

Custom Utility warning

pypond.functions module

Functions to act as reducers/aggregators, etc.

class pypond.functions.Filters

Bases: object

Filter functions to pass to aggregation function factory methods.

These all control how the underlying aggregators handle missing/invalid values. Can pass things through (the default to all agg functions), ignore any bad values, transform any bad values to zero, or make the entire aggregation fail if there are any bad values.

static ignore_missing(events)

Pull out the bad values resulting in a shorter array.

static keep_missing(events)

no-op - default

static none_if_empty(events)

Return none if the event list is empty. Could be used to override the default behavior of Functions.avg(), etc

static propogate_missing(events)

It’s all bad if there are missing values - return None if so.

static zero_missing(events)

Make bad values 0 - array will be the same length.

class pypond.functions.Functions

Bases: object

Utility class to contain the functions.

The inner() function is the one that does the actual processing and it returned by calling the outer named function. Previously one would pass Functions.sum to an aggregation or reducer method:

timeseries.aggregate(Functions.sum, 'in')

Now it is a factory to return the acutal function:

timeseries.aggregate(Functions.sum(), 'in')

The static methods in the Filters class can be passed to the outer factory method to control how bad values are handled:

timeseries.aggregate(Functions.sum(Filters.zero_missing), 'in')
static avg(flt=<function keep_missing>)
static count(flt=<function keep_missing>)
static difference(flt=<function keep_missing>)
static first(flt=<function keep_missing>)
static keep(flt=<function keep_missing>)
static last(flt=<function keep_missing>)
static max(flt=<function keep_missing>)
static median(flt=<function keep_missing>)
static min(flt=<function keep_missing>)
static percentile(perc, method='linear', flt=<function keep_missing>)
static stddev(flt=<function keep_missing>)
static sum(flt=<function keep_missing>)
pypond.functions.f_check(flt)

Set the default filter for aggregation operations when no filter is specified. When one is, make sure that it is a valid filter.

pypond.index module

Implementation of Pond Index class.

http://software.es.net/pond/#/index

class pypond.index.Index(s, utc=True)

Bases: pypond.bases.PypondBase

An index that represents as a string a range of time. That range may either be in UTC or local time. UTC is the default.

The actual derived timerange can be found using asRange(). This will return a TimeRange instance.

The original string representation can be found with toString(). A nice version for date based indexes (e.g. 2015-03) can be generated with toNiceString(format) (e.g. March, 2015).

The index string arg will may be of two forms:

  • 2015-07-14 (day)
  • 2015-07 (month)
  • 2015 (year)

or:

  • 1d-278 (range, in n x days, hours, minutes or seconds)
Parameters:
  • s (str) – The index string in one of the aforementioned formats.
  • utc (bool, optional) – Index interpreted as UTC or localtime. Please don’t set this to false since non-UTC times are the devil.
Raises:

IndexException – Raised if arg s could not be translated into a valid timerange/index.

__str__()

call to_string()

Returns:String representation of the object.
Return type:str
as_string()

Alias for to_string()

Returns:The index string as previously outlined.
Return type:str
as_timerange()

Returns the Index as a TimeRange

Returns:The underlying time range object.
Return type:TimeRange
begin()

Returns start date of the index.

Returns:Start date of the index.
Return type:datetime.datetime
end()

Returns end date of the index.

Returns:End date of the index.
Return type:datetime.datetime
static get_daily_index_string(date, utc=True)

Generate an index string with day granularity.

Parameters:
  • date (datetime.datetime) – An aware UTC datetime object
  • utc (bool, optional) – Render the index in local time this is used for display purposes to render charts in a localized way.
Returns:

The formatted index string.

Return type:

string

static get_index_string(win, dtime)

Return the index string given an index prefix and a datetime object. Example usage follows.

dtime = aware_dt_from_args(
    dict(year=2015, month=3, day=14, hour=7, minute=32, second=22))

idx_str = Index.get_index_string('5m', dtime)

self.assertEqual(idx_str, '5m-4754394')

previously: Generator.bucketIndex

Parameters:
  • win (str) – Prefix of the index string.
  • dtime (datetime.datetime) – Datetime to generate index string from.
Returns:

The index string.

Return type:

str

static get_index_string_list(win, timerange)

Given the time range, return a list of strings of index values every <prefix> tick. Example usage follows (from test suite).

dtime_1 = aware_dt_from_args(
dict(year=2015, month=3, day=14, hour=7, minute=30, second=0))

dtime_2 = aware_dt_from_args(
    dict(year=2015, month=3, day=14, hour=8, minute=29, second=59))

idx_list = Index.get_index_string_list('5m', TimeRange(dtime_1, dtime_2))

self.assertEqual(len(idx_list), 12)
self.assertEqual(idx_list[0], '5m-4754394')
self.assertEqual(idx_list[-1], '5m-4754405')

previously: Generator.bucketIndexList

Parameters:
  • win (str) – Prefix of the index string.
  • timerange (TimeRange) – Time range object to generate index string from
Returns:

A list of strings of index values at every “tick” in the range specified.

Return type:

list

static get_monthly_index_string(date, utc=True)

Generate an index string with month granularity.

Parameters:
  • date (datetime.datetime) – An aware UTC datetime object
  • utc (bool, optional) – Render the index in local time this is used for display purposes to render charts in a localized way.
Returns:

The formatted index string.

Return type:

string

static get_yearly_index_string(date, utc=True)

Generate an index string with year granularity.

Parameters:
  • date (datetime.datetime) – An aware UTC datetime object
  • utc (bool, optional) – Render the index in local time this is used for display purposes to render charts in a localized way.
Returns:

The formatted index string.

Return type:

string

range_from_index_string(idx_str, is_utc=True)

Generate the time range from the idx string.

The index string arg will may be of two forms:

  • 2015-07-14 (day)
  • 2015-07 (month)
  • 2015 (year)

or:

  • 1d-278 (range, in n x days, hours, minutes or seconds)

and return a TimeRange for that time. The TimeRange may be considered to be local time or UTC time, depending on the utc flag passed in.

This was in src/util.js in the original project, but the only thing using the code in that util.js was the Index class, and it makes more sense having this as a class method and setting self._index_type makes further regex analysis of the index unnecessary.

Parameters:
  • idx_str (str) – The index string in one of the aformentioned formats
  • is_utc (bool, optional) – Index interpreted as utc or localtime. Please don’t use localtime.
Returns:

A time range made from the interpreted index string.

Return type:

TimeRange

Raises:

IndexException – Raised when the string format is determined to be invalid.

to_json()

Returns the Index as JSON, which will just be its string representation

This is actually like json.loads(s) - produces the actual data structure.

Returns:The index string as previously outlined.
Return type:str
to_nice_string(fmt=None)

for the calendar range style Indexes, this lets you return that calendar range as a human readable format, e.g. “June, 2014”. The format specified is a Moment.format.

Originally implemented at Util.niceIndexString in the JS source, this is just a greatly simplified version using self._index_type.

Parameters:fmt (str, optional) – User can pass in a valid strftime() format string.
Returns:FThe index text string as a formatted (strftime()) time.
Return type:str
to_string()

Simply returns the Index as its string

In JS land, this is synonymous with __str__ or __unicode__

Returns:The index string as previously outlined.
Return type:str
utc

accessor for internal utc boolean.

static window_duration(win)

duration in ms given a window duration string.

previously: Generator.getLengthFromSize.

Parameters:win (str) – An index string in the previously mentioned 1d-278 style format.
Returns:Duration of the index/range in ms.
Return type:int
static window_position_from_date(win, dtime)

window position from datetime object. Called by get_index_string_list().

previously: Generator.getBucketPosFromDate

Parameters:
  • win (str) – Prefix if the index string.
  • dtime (datetime.datetime) – Datetime to calculate suffix from.
Returns:

The suffix for the index string.

Return type:

int

pypond.indexed_event module

Event with a time range specified as an index.

class pypond.indexed_event.IndexedEvent(instance_or_begin, data=None, utc=True)

Bases: pypond.event.EventBase

Associates a time range specified as an index.

The creation of an IndexedEvent is done by combining two parts: the Index and the data.

To construct you specify an Index, along with the data.

The index may be an Index, or a string.

To specify the data you can supply either:
  • a python dict containing key values pairs
  • an pyrsistent.pmap, or
  • a simple type such as an integer. In the case of the simple type this is a shorthand for supplying {“value”: v}.
Parameters:
  • instance_or_begin (Index, pyrsistent.pmap, or str.) – Index for copy constructor, pmap as the fully formed internals or a string arg to the Index class.
  • data (dict or pyrsistent.pmap, optional) – Data payload.
  • utc (bool, optional) – UTC or localtime to create index in. Please don’t not use UTC. Yes, that’s a double negative.
begin()

The begin time of this Event, which will be just the timestamp.

Returns:Datetime of the beginning of the range.
Return type:datetime.datetime
end()

The end time of this Event, which will be just the timestamp.

Returns:Datetime of the end of the range.
Return type:datetime.datetime
index()

Returns the Index associated with the data in this Event.

Returns:The underlying index object
Return type:Index
index_as_string()

Returns the Index as a string, same as event.index().toString().

Returns:str – String version of the underlying Index.
Returns:String version of the underlying index.
Return type:str
key()

Return the index string of this object.

Returns:The index of this object.
Return type:Index
set_data(data)

Sets the data portion of the event and returns a new IndexedEvent.

Parameters:
  • data (dict) – The new data portion for this event object.
  • data – The new data payload for this event object.
Returns:

IndexedEvent - a new IndexedEvent object.

Returns:

A new indexed event with the provided payload.

Return type:

IndexedEvent

timerange()

The TimeRange of this data.

Returns:Time range from the underlying index.
Return type:TimeRange
timerange_as_local_string()

The timerange of this data, in Local time, as a string..

Returns:Underlying TimeRange as localtime string.
Return type:str
timerange_as_utc_string()

The timerange of this data, in UTC time, as a string.

Returns:Underlying TimeRange as UTC string.
Return type:str
timestamp()

The timestamp of this beginning of the range.

Returns:Datetime of the beginning of the range.
Return type:datetime.datetime
to_json()

Returns the Event as a JSON object, essentially: {time: t, data: {key: value, ...}}

This is actually like json.loads(s) - produces the actual vanilla data structure.

Returns:Dictionary representation of object internals.
Return type:dict
to_point(cols=None)

Returns a flat array starting with the timestamp, followed by the values. Doesn’t include the groupByKey (key).

Can be given an optional list of columns so the returned list will have the values in order. Primarily for the TimeSeries wire format.

Parameters:cols (list, optional) – List of columns to order the points in so the TimeSeries wire format is rendered corectly.
Returns:Epoch ms followed by points.
Return type:list
type()

Return the class of this event type.

Returns:The class of this event type.
Return type:class

pypond.pipeline module

Implementation of the Pond Pipeline classes.

http://software.es.net/pond/#/pipeline

class pypond.pipeline.Pipeline(arg=None)

Bases: pypond.bases.PypondBase

Build a new Pipeline.

A pipeline manages a processing chain, for either batch or stream processing of collection data.

The argument may be either:

  • a Pipeline (copy ctor)
  • a pyrsistent.PMap in which case the internal state will be constructed from the map.

Usually you would initialize a Pipeline using the factory function, rather than this object directly.

Parameters:arg (Pipeline, PMap, None) – See above.
add_result(arg1, arg2=None)

Add the incoming result from the processor callback.

Parameters:
  • arg1 (str) – Collection key string.
  • arg2 (Collection or str) – Generally the incoming collection.
aggregate(fields)

Uses the current Pipeline windowing and grouping state to build collections of events and aggregate them.

IndexedEvents will be emitted out of the aggregator based on the emitOn state of the Pipeline.

To specify what part of the incoming events should be aggregated together you specify a fields object. This is a map from fieldName to operator.

uin = Stream()

(
    Pipeline()
    .from_source(uin)
    .window_by('1h')
    .emit_on('eachEvent')
    .aggregate(
        {
            'in_avg': {'in': Functions.avg()},
            'out_avg': {'out': Functions.avg()}
        }
    )
    .to(EventOut, cback)
)
Parameters:fields (dict) – Fields and operators to be aggregated. Deep fields may be indicated by using this.style.notation. As in the above example, they fields.keys() are the names of the new columns to be created (or an old one to be overwritten), and the value is another dict - the key is the existing column and the value is the function to apply to it when creating the new column.
Returns:The Pipeline
Return type:Pipeline
align(field_spec=None, window='5m', method='linear', limit=None)

Align entry point

as_events(options=None)

Converts incoming TimeRangeEvents or IndexedEvents to Events. This is helpful since some processors will emit TimeRangeEvents or IndexedEvents, which may be unsuitable for some applications.

There are three options:

  1. use the beginning time (options = Options(alignment=’lag’)
  2. use the center time (options = Options(alignment=’center’)
  3. use the end time (options = Options(alignment=’lead’)
Parameters:options (Options) – The options, see above.
Returns:The Pipeline.
Return type:Pipeline
as_indexed_events(options=None)

Converts incoming Events to IndexedEvents.

Note: It isn’t possible to convert TimeRangeEvents to IndexedEvents.

Parameters:options (Options) – Contains the conversion options. In this case, the duration string of the Index is expected. Must contain the key ‘duration’ and the duration string is of the form “1h” for one hour, “30s” for 30 seconds and so on.
Returns:Description
Return type:TYPE
as_time_range_events(options=None)

Converts incoming Events or IndexedEvents to TimeRangeEvents.

There are three option for alignment:

  1. time range will be in front of the timestamp - ie: options = Options(alignment=’front’)
  2. time range will be centered on the timestamp - ie: options = Options(alignment=’center’)
  3. time range will be positoned behind the timestamp - ie: options = Options(alignment=’behind’)

The duration is of the form “1h” for one hour, “30s” for 30 seconds and so on.

Parameters:options (dict) – Args to add to Options - duration and alignment.
Returns:The Pipeline
Return type:Pipeline
clear_group_by()

Remove the grouping from the pipeline. In other words recombine the events.

Returns:The Pipeline
Return type:Pipeline
clear_results()

Clear the result state of this Pipeline instance.

clear_window()

Remove windowing from the Pipeline. This will return the pipeline to no window grouping. This is useful if you have first done some aggregation by some window size and then wish to collect together the all resulting events.

Returns:The Pipeline
Return type:Pipeline
collapse(field_spec_list, name, reducer, append=True)

Collapse a subset of columns using a reducer function.

Parameters:
  • field_spec_list (list) – List of columns to collapse. If you need to retrieve deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’].
  • name (string) – The resulting output column’s name.
  • reducer (function) – Function to use to do the reduction.
  • append (bool) – Add the new column to the existing ones, or replace them.
Returns:

The Pipeline.

Return type:

Pipeline

count(observer, force=True)

Outputs the count of events.

Parameters:
  • observer (function) – The callback function. This function will be passed collection.size(), window_key, group_by_key) as args.
  • force (bool, optional) – Flush at the end of processing batch events, output again with possibly partial result
Returns:

The Pipeline.

Return type:

Pipeline

emit_on(trigger)

Sets the condition under which an accumulated collection will be emitted. If specified before an aggregation this will control when the resulting event will be emitted relative to the window accumulation. Current options are:

  • to emit on every event, or
  • just when the collection is complete, or
  • when a flush signal is received, either manually calling done(), or at the end of a bounded source.

The strings indicating how to trigger how a Collection should be emitted - can be:

  • “eachEvent” - when a new event comes in, all currently maintained collections will emit their result.
  • “discard” - when a collection is to be discarded, first it will emit. But only then.
  • “flush” - when a flush signal is received.

The difference will depend on the output you want, how often you want to get updated, and if you need to get a partial state. There’s currently no support for late data or watermarks. If an event passes comes in after a collection window, that collection is considered finished.

Parameters:trigger (string) – See above
Returns:The Pipeline
Return type:Pipeline
fill(field_spec=None, method='zero', fill_limit=None)

Take the data in this timeseries and “fill” any missing or invalid values. This could be setting None values to zero so mathematical operations will succeed, interpolate a new value, or pad with the previously given value.

Parameters:
  • field_spec (str, list, tuple, None, optional) –

    Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.

    If None, the default column field ‘value’ will be used.

  • method (str, optional) – Filling method: zero | linear | pad
  • fill_limit (None, optional) – Set a limit on the number of consecutive events will be filled before it starts returning invalid values. For linear fill, no filling will happen if the limit is reached before a valid value is found.
Returns:

The Pipeline.

Return type:

Pipeline

filter(op)

Filter the event stream using an operator

Parameters:op (function) – A function that returns True or False
Returns:The Pipeline
Return type:Pipeline
first()

Get the first processor

Returns:An pipeline processor.
Return type:Processor
from_source(src)

Note: originally named from() in JS code.

The source to get events from. The source needs to be able to iterate its events using for..of loop for bounded Ins, or be able to emit() for unbounded Ins. The actual batch, or stream connection occurs when an output is defined with to().

Pipelines can be chained together since a source may be another Pipeline.

Parameters:src (Bounded, Stream or Pipeline) – The source for the Pipeline, or another Pipeline.
Returns:The Pipeline.
Return type:Pipeline
get_emit_on()

Get the emit on (eachEvent, etc).

Returns:The emit on string (discards, flush, etc).
Return type:str
get_group_by()

Get the group by callback.

Returns:Returns the group by function.
Return type:function
get_utc()

Get the UTC state..

Returns:In UTC or not.
Return type:bool
get_window_duration()

Get the window duration.

Returns:A formatted window duration.
Return type:str
get_window_type()

Get the window type (global, etc).

Returns:The window type.
Return type:str
group_by(key=None)

Sets a new groupBy expression. Returns a new Pipeline.

Grouping is a state set on the Pipeline. Operations downstream of the group specification will use that state. For example, an aggregation would occur over any grouping specified.

The key to group by. You can pass in a function that takes and event as an arg and dynamically returns the group by key.

Otherwise key will be interpreted as a field_path:

  • a single field name or deep.column.path, or
  • a array style field_path [‘deep’, ‘column’, ‘path’] to a single column.

This is not a list of multiple columns, it is the path to a single column to pull group by keys from. For example, a column called ‘status’ that contains the values ‘OK’ and ‘FAIL’ - they key would be ‘status’ and two collections OK and FAIL will be generated.

If key is None, then the default column ‘value’ will be used.

Parameters:key (function, list or string) – The key to group by. See above.
Returns:The Pipeline
Return type:Pipeline
input()

Originally called in() in JS code.

last()

Get the last processor

Returns:An pipeline processor.
Return type:Processor
map(op)

Map the event stream using an operator.

Parameters:op (function) – A function that returns a new Event.
Returns:The Pipeline.
Return type:Pipeline
mode()

Get the pipeline mode (ie: batch, stream).

Returns:The mode.
Return type:str
offset_by(offset_by, field_spec=None)

Processor to offset a set of fields by a value. Mostly used for testing processor and pipeline operations with a simple operation.

Parameters:
  • offset_by (int, float) – The amout to offset by.
  • field_spec (str, list, tuple, None, optional) –

    Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.

    If None, the default ‘value’ column will be used.

Returns:

The modified Pipeline.

Return type:

Pipeline

rate(field_spec=None, allow_negative=True)

derivative entry point

results_done()

Set result state as done.

select(field_spec=None)

Select a subset of columns.

Parameters:field_spec (str, list, tuple, None, optional) –

Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.

If None, the default ‘value’ column will be used.

Returns:The Pipeline.
Return type:Pipeline
take(limit)

Take events up to the supplied limit, per key.

Parameters:
  • limit (int) – Integer number of events to take.
  • global_flush (bool, optional) – If set to true (default is False) then the Taker will send out a single .flush() event if the limit has been exceeded and the window_type is ‘global.’ This can be used as a fail safe with processors that cache events (like the Filler) to ensure all events are emitted when the Pipeline is used in ‘stream’ mode. This is not needed in ‘batch’ mode because the flush signal is sent automatically.
Returns:

The Pipeline.

Return type:

Pipeline

to(out, observer=None, options=<pypond.util.Options object>)

Sets up the destination sink for the pipeline.

For a batch mode connection, i.e. one with a Bounded source, the output is connected to a clone of the parts of the Pipeline dependencies that lead to this output. This is done by a Runner. The source input is then iterated over to process all events into the pipeline and though to the Out.

For stream mode connections, the output is connected and from then on any events added to the input will be processed down the pipeline to the out.

def cback(event):
    do_something_with_the_event(event)

timeseries = TimeSeries(IN_OUT_DATA)

(
    Pipeline()
    .from_source(timeseries)
    .emit_on('flush')
    .collapse(['in', 'out'], 'total', Functions.sum())
    .aggregate(dict(total=Functions.max()))
    .to(EventOut, cback)
)

NOTE: arg list has been changed from the ordering in the JS source to conform to python convention.

Parameters:
  • out (EventOut, CollectionOut, etc instance) – The output.
  • observer (function or instance) – The observer.
  • options (Options, optional) – Options.
Returns:

The Pipeline.

Return type:

Pipeline

to_event_list()

Directly return the results from the processor rather than passing a callback in.

Returns:Returns the _results attribute with events.
Return type:list or dict
to_keyed_collections()

Directly return the results from the processor rather than passing a callback in.

Returns:Returns the _results attribute from a Pipeline object after processing. Will contain Collection objects.
Return type:list or dict
window_by(window_or_duration=None, utc=True)

Set the window, returning a new Pipeline. A new window will have a type and duration associated with it. Current available types are:

  • fixed (e.g. every 5m)
  • calendar based windows (e.g. every month)

Windows are a type of grouping. Typically you’d define a window on the pipeline before doing an aggregation or some other operation on the resulting grouped collection. You can combine window-based grouping with key-grouping (see groupBy()).

There are several ways to define a window. The general format is an options object containing a type field and a duration field.

Currently the only accepted type is fixed, but others are planned. For duration, this is a duration string, for example “30s” or “1d”. Supported are: seconds (s), minutes (m), hours (h) and days (d).

The argument here is either a string or an object with string attrs type and duration. The arg can be either a window or a duration.

If no arg is supplied or set to None, the window_type is set to ‘global’ and there is no duration.

There is also a short-cut notation for a fixed window or a calendar window. Simply supplying the duration string (“30s” for example) will result in a fixed window type with the supplied duration.

Window window_or_duration may be:

  • A fixed interval duration (see next): “fixed”
  • A calendar interval: “daily,” “monthly” or “yearly”

Duration is of the form:

  • “30s” or “1d” etc - supports seconds (s), minutes (m), hours (h), days (d). When duration is passed as the arg, window_type is set to ‘fixed’.
Parameters:
  • window_or_duration (string, Capsule) – See above.
  • utc (bool) – How to render the aggregations - in UTC vs. the user’s local time. Can not be set to False if using a fixed window size.
Returns:

The Pipeline.

Return type:

Pipeline

class pypond.pipeline.Runner(pline, output)

Bases: pypond.bases.PypondBase

A runner is used to extract the chain of processing operations from a Pipeline given an Output. The idea here is to traverse back up the Pipeline(s) and build an execution chain.

When the runner is started, events from the “in” are streamed into the execution chain and outputed into the “out”.

Rebuilding in this way enables us to handle connected pipelines:

                   |--
in --> pipeline ---.
                   |----pipeline ---| -> out

The runner breaks this into the following for execution:

_input        - the "in" or from() bounded input of
                the upstream pipeline
_processChain - the process nodes in the pipelines
                leading to the out
_output       - the supplied output destination for
                the batch process
NOTE: There’s no current way to merge multiple sources, though
a time series has a TimeSeries.merge() static method for this purpose.
Parameters:
  • pipeline (Pipeline) – The pipeline to run.
  • output (PipelineOut) – The output driving this runner
start(force=False)

Start the runner

Parameters:
  • force (bool, optional) – force Flush at the end of the batch source
  • cause any buffers to emit. (to) –
pypond.pipeline.default_callback(*args)

Default no-op callback for group_by in the Pipeline constructor.

pypond.range module

Implementation of Pond TimeRange classes.

http://software.es.net/pond/#/timerange

class pypond.range.TimeRange(instance_or_begin, end=None)

Bases: pypond.range.TimeRangeBase

Builds a new TimeRange. First arg may be of several different formats:

  • Another TimeRange (copy constructor)
  • A python tuple, list or pyrsistent.PVector object containing two python datetime objects or ms timestamps.
  • Two arguments, begin and end, each of which may be a datetime object, or a ms timestamp.
Parameters:
  • instance_or_begin (TimeRange, iterable, int or datetime.datetime.) – See above for variations.
  • end (int or datetime.datetime, optional) – Optional arg for the end of the time range.
Raises:

TimeRangeException – Raised to indicate errors with args.

__str__()

string repr method.

Returns:String repr method.
Return type:str
begin()

Returns the begin time of the TimeRange.

Returns:The begin time.
Return type:datetime.datetime
contains(other)

Returns true if other is completely inside this.

Parameters:other (TimeRange) – Another time range object.
Returns:Returns true if other range is completely inside this one.
Return type:bool
disjoint(other)

Returns true if the passed in other Range in no way overlaps this time Range.

Parameters:other (TimeRange) – Another time range object.
Returns:Returns true if other range in no way overlaps this one.
Return type:bool
duration()

Return epoch milliseconds.

Returns:Duration in ms.
Return type:int
end()

Returns the end time of the TimeRange.

Returns:The end time.
Return type:datetime.datetime
equals(other)

Returns if the two TimeRanges can be considered equal, in that they have the same times.

Parameters:other (TimeRange) – Another time range object
Returns:True if both object represent the same time range.
Return type:bool
extents(other)

Returns a new Timerange which covers the extents of this and other combined.

Parameters:other (TimeRange) – Another time range object
Returns:New time range which covers the extents of this and the other range combined.
Return type:TimeRange
humanize()

Returns a human friendly version of the TimeRange, e.g. “Aug 1, 2014 05:19:59 am to Aug 1, 2014 07:41:06 am”

This displays in local time, so don’t freak out.

Returns:Human friendly time range string.
Return type:str
humanize_duration()

Humanize the duration.

Returns:Humanized duration string.
Return type:str
intersection(other)

Returns a new TimeRange which represents the intersection (overlapping) part of this and other.

Parameters:other (TimeRange) – Another time range object.
Returns:A new time range object representing the intersection (overlapping) part of this and the other.
Return type:TimeRange
static last_day()

Generate a time range spanning last 24 hours

Returns:A new time range object of the requested duration.
Return type:TimeRange
static last_month()

Generate a time range spanning last month.

Returns:A new time range object of the requested duration.
Return type:TimeRange
static last_ninety_days()

Generate a time range spanning last 90 days

Returns:A new time range object of the requested duration.
Return type:TimeRange
static last_seven_days()

Generate a time range spanning last 7 days

Returns:A new time range object of the requested duration.
Return type:TimeRange
static last_thirty_days()

Generate a time range spanning last 30 days

Returns:A new time range object of the requested duration.
Return type:TimeRange
overlaps(other)

Returns true if the passed in other TimeRange overlaps this time Range.

Parameters:other (TimeRange) – Another time range object.
Returns:Returns true if other range overlaps this one.
Return type:bool
range()

Returns the internal range, which is an Immutable List containing begin and end values.

Returns:Immutable list containing the range.
Return type:pyrsistent.pvector
relative_string()

Returns a human friendly version of the TimeRange, e.g. e.g. “a few seconds ago to a month ago”

Returns:Another human friendly duration string.
Return type:str
set_begin(dtime)

Sets a new begin time on the TimeRange. The result will be a new TimeRange.

Parameters:dtime (datetime.datetime) – New time range boundary.
Returns:A new time range object reflecting the new range bounds.
Return type:TimeRange
Raises:TimeRangeException – Raised on invalid arg.
set_end(dtime)

Sets a new end time on the TimeRange. The result will be a new TimeRange.

Parameters:dtime (datetime.datetime) – New time range boundary.
Returns:A new time range object reflecting the new range bounds.
Return type:TimeRange
Raises:TimeRangeException – Raised on invalid arg.
to_json()

Returns the TimeRange as a python list of two ms timestamps.

Returns:List of two timestamps.
Return type:list
to_local_string()

Returns the TimeRange as a string expressed in local time.

Returns:Timerange as a string.
Return type:str
to_string()

Returns the TimeRange as a string, useful for serialization.

Returns:String representaion of the range.
Return type:str
to_utc_string()

Returns the TimeRange as a string expressed in UTC time.

Returns:Timerange as string.
Return type:str
within(other)

Returns true if this TimeRange is completely within the supplied other TimeRange.

Parameters:other (TimeRange) – Another time range object.
Returns:Returns true if this range is completely inside the other one.
Return type:bool
class pypond.range.TimeRangeBase

Bases: pypond.bases.PypondBase

Base for TimeRange

static awareness_check(dtime)

Check input to make sure datetimes are aware. Presumes an iterable contaning datetimes, but will fail over to process a single datetime object via duck typing.

Parameters:dtime (list, tuple or pvector but will failover to datetime.) – An interable of datetime objects
Raises:TimeRangeException – Raised if a non-aware datetime object is found.
static sanitize_list_input(list_type)

Validate input when a pvector, list or tuple is passed in as a constructor arg.

Parameters:list_type (list, tuple of pvector) – Iterable containing args (epoch ms or datetime) that was passed to the constructor.
Returns:Immutable list-like object with two elements - the beginning and ending datetime of the range.
Return type:pyrsistent.pvector
Raises:TimeRangeException – Raised if bad args have been passed in.
static validate_range(range_obj)

Make sure that the end time is not chronologically before the begin.

Raises:TimeRangeException
Parameters:range_obj (pyrsistent.pvector) – The internal begin/end immutable range object.
Raises:TimeRangeException – Raised if end arg is earlier in time than begin.

pypond.series module

Implements the Pond TimeSeries class.

http://software.es.net/pond/#/timeseries

class pypond.series.TimeSeries(instance_or_wire)

Bases: pypond.bases.PypondBase

A TimeSeries is a a Series where each event is an association of a timestamp and some associated data.

Data passed into it may have the following format, which is our wire format

{
"name": "traffic",
"columns": ["time", "value", ...],
"points": [
   [1400425947000, 52, ...],
   [1400425948000, 18, ...],
   [1400425949000, 26, ...],
   [1400425950000, 93, ...],
   ...
 ]
}

Alternatively, the TimeSeries may be constructed from a list of Event objects.

Internaly the above series is represented as two parts:

  • Collection - an Immutable.List of Events and associated methods

    to query and manipulate that list

  • Meta data - an Immutable.Map of extra data associated with the

    TimeSeries

The events stored in the collection may be Events (timestamp based), TimeRangeEvents (time range based) or IndexedEvents (an alternative form of a time range, such as “2014-08” or “1d-1234”)

The timerange associated with a TimeSeries is simply the bounds of the events within it (i.e. the min and max times).

Initialize a TimeSeries object from:

  • Another TimeSeries/copy ctor
  • An event list
  • From the wire format
Parameters:instance_or_wire (TimeSeries, list of events, wire format) – See above
Raises:TimeSeriesException – Raised when args can not be properly handled.
event_type_map

dict – Map text keys from wire format to the appropriate Event class.

__str__()

call to_string()

aggregate(func, field_path=None)

Aggregates the events down using a user defined function to do the reduction.

Parameters:
  • func (function) – Function to pass to map reduce to aggregate.
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

Returns:

Dict of reduced values

Return type:

dict

align(field_spec=None, window='5m', method='linear', limit=None)

Align entry point

at(i)

Access the series events via numeric index

Parameters:i (int) – An array index
Returns:The Event object found at index i
Return type:Event
at_first()

Return first event in the series

Returns:The first event in the series.
Return type:Event
at_last()

Return last event in the series

Returns:The last event in the series.
Return type:Event
at_time(time)

Return an event in the series by its time. This is the same as calling bisect first and then using at with the index.

Parameters:time (datetime.datetime) – A datetime object
Returns:The event at the designated time.
Return type:Event
avg(field_spec=None, filter_func=None)

Get avg

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Average value

Return type:

int or float

begin()

Gets the earliest time represented in the TimeSeries.

Returns:The begin time of the underlying time range.
Return type:datetime.datetime
begin_timestamp()

Gets the earliest time represented in the TimeSeries in epoch ms.

Returns:The begin time of the underlying time range in epoch ms.
Return type:int
bisect(dtime, b=0)

Finds the index that is just less than the time t supplied. In other words every event at the returned index or less has a time before the supplied t, and every sample after the index has a time later than the supplied t.

Optionally supply a begin index to start searching from. Returns index that is the greatest but still below t.

Parameters:
  • dtime (datetime.datetime) – Date time object to search with
  • b (int, optional) – An index position to start searching from.
Returns:

The index of the Event searched for by dtime.

Return type:

int

static build_metadata(meta)

Build the metadata out of the incoming wire format

Parameters:meta (dict) – Incoming wire format.
Returns:Immutable dict of metadata
Return type:pyrsistent.pmap
clean(field_path=None)

Returns a new TimeSeries by testing the field_path values for being valid (not NaN, null or undefined). The resulting TimeSeries will be clean for that fieldSpec.

Parameters:field_path (str, list, tuple, None, optional) –

Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

If field_path is None, then [‘value’] will be the default.

Returns:New time series from clean values from the field spec.
Return type:TimeSeries
collapse(field_spec_list, name, reducer, append=True)

Takes a fieldSpecList (list of column names) and collapses them to a new column which is the reduction of the matched columns in the fieldSpecList.

Parameters:
  • field_spec_list (list) – List of columns to collapse. If you need to retrieve deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’].
  • name (str) – Name of new column containing collapsed values.
  • reducer (Function to pass to reducer.) – function
  • append (bool, optional) – Append collapsed column to existing data or fresh data payload.
Returns:

A new time series from the collapsed columns.

Return type:

TimeSeries

collect_by_fixed_window(window_size)

Summary

Parameters:window_size (str) – The window size - 1d, 6h, etc
Returns:Returns the _results attribute from a Pipeline object after processing. Will contain Collection objects.
Return type:list or dict
collection()

Returns the internal collection of events for this TimeSeries

Returns:Internal collection.
Return type:Collection
columns()

create a list of the underlying columns.

Due to the nature of the event data and using dicts, the order of the column list might be somewhat unpredictable. When generating points, this is solved by passing the column list to .to_point() as an optional argument to ensure that the points and the columns are properly aligned.

Returns:List of column names.
Return type:list
count()

alias for size.

Returns:Number of rows in series.
Return type:int
crop(timerange)

Crop the TimeSeries to the specified TimeRange and return a new TimeSeries

Parameters:timerange (TimeRange) – Bounds of the new TimeSeries
Returns:The new cropped TimeSeries instance.
Return type:TimeSeries
daily_rollup(aggregation, to_events=False, utc=False)

Builds a new TimeSeries by dividing events into days. The days are in either local or UTC time, depending on if utc(true) is set on the Pipeline.

Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:

{
    'in_avg': {'in': Functions.avg()},
    'out_avg': {'out': Functions.avg()},
    'in_max': {'in': Functions.max()},
    'out_max': {'out': Functions.max()},
}

will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each day, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.

Example:

timeseries = TimeSeries(data)
hourly_max_temp = timeseries.daily_rollup(
    {'max_temp': {'temperature': Functions.max()}}
)

This helper function defaults to rendering the aggregations in localtime. The reason for this is that rendering in localtime makes the most sense on the client side - like rendering a timeseries chart. A user looking at a chart in UTC might not make much sense.

Since this is now being used in servers side applications, the optional arg utc can be set to True to force it to render in UTC instead.

Probably best to favor using .fixed_window_rollup() when wanting to render in UTC.

Parameters:
  • aggregation (dict) – The aggregation specification e.g. {‘max_temp’: {‘temperature’: Functions.max()}}
  • to_event (bool, optional) – Do conversion to Event objects
Returns:

The resulting rolled up TimeSeries.

Return type:

TimeSeries

end()

Gets the latest time represented in the TimeSeries.

Returns:The end time of the underlying time range.
Return type:datetime.datetime
end_timestamp()

Gets the latest time represented in the TimeSeries in epoch ms.

Returns:The end time of the underlying time range in epoch ms.
Return type:int
static equal(series1, series2)

Check equality - same instance.

Parameters:
Returns:

Are the two the same instance?

Return type:

bool

event_type_map = {'index': <class 'pypond.indexed_event.IndexedEvent'>, 'timerange': <class 'pypond.timerange_event.TimeRangeEvent'>, 'time': <class 'pypond.event.Event'>}
events()

Generator to allow for..of loops over series.events()

Returns:Generator for loops.
Return type:iterator
fill(field_spec=None, method='zero', fill_limit=None)

Take the data in this timeseries and “fill” any missing or invalid values. This could be setting None values to zero so mathematical operations will succeed, interpolate a new value, or pad with the previously given value.

Parameters:
  • field_spec (str, list, tuple, None, optional) –

    Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.

    If None, the default column field ‘value’ will be used.

  • method (str, optional) – Filling method: zero | linear | pad
  • fill_limit (None, optional) – Set a limit on the number of consecutive events will be filled before it starts returning invalid values. For linear fill, no filling will happen if the limit is reached before a valid value is found.
Returns:

A clone of this TimeSeries with a new Collection generated by the fill operation.

Return type:

TimeSeries

fixed_window_rollup(window_size, aggregation, to_events=False)

Builds a new TimeSeries by dividing events within the TimeSeries across multiple fixed windows of size windowSize.

Note that these are windows defined relative to Jan 1st, 1970, and are UTC, so this is best suited to smaller window sizes (hourly, 5m, 30s, 1s etc), or in situations where you don’t care about the specific window, just that the data is smaller.

Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:

{
    'in_avg': {'in': Functions.avg()},
    'out_avg': {'out': Functions.avg()},
    'in_max': {'in': Functions.max()},
    'out_max': {'out': Functions.max()},
}

will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each hour, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.

Example:

timeseries = TimeSeries(data)
daily_avg = timeseries.fixed_window_rollup('1d',
    {'value_avg': {'value': Functions.avg()}}
)
Parameters:
  • window_size (str) – The size of the window, e.g. ‘6h’ or ‘5m’
  • aggregation (Options) – The aggregation specification
  • to_events (bool, optional) – Convert to events
Returns:

The resulting rolled up TimeSeries

Return type:

TimeSeries

hourly_rollup(aggregation, to_events=False)

Builds a new TimeSeries by dividing events into hours. The hours are in either local or UTC time, depending on if utc(true) is set on the Pipeline.

Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:

{
    'in_avg': {'in': Functions.avg()},
    'out_avg': {'out': Functions.avg()},
    'in_max': {'in': Functions.max()},
    'out_max': {'out': Functions.max()},
}

will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each hour, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.

Example:

timeseries = TimeSeries(data)
hourly_max_temp = timeseries.hourly_rollup(
    {'max_temp': {'temperature': Functions.max()}}
)
Parameters:
  • aggregation (dict) – The aggregation specification e.g. {‘max_temp’: {‘temperature’: Functions.max()}}
  • to_event (bool, optional) – Do conversion to Event objects
Returns:

The resulting rolled up TimeSeries.

Return type:

TimeSeries

index()

Get the index.

Returns:Get the index.
Return type:Index
index_as_range()

Index returned as time range.

Returns:Index as a TimeRange or None
Return type:TimeRange
index_as_string()

Index represented as a string.

Returns:String format of Index or None.
Return type:str
is_utc()

Get data utc.

map(op)
Takes an operator that is used to remap events from this TimeSeries to
new set of Events. The result is returned via the callback.
Parameters:op (function) – An operator which will be passed each event and which should return a new event.
Returns:A clone of this TimeSeries with a new Collection generated by the map operation.
Return type:TimeSeries
max(field_path=None, filter_func=None)

Get max

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Max value

Return type:

int or float

mean(field_path=None, filter_func=None)

Get mean

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Mean value

Return type:

int or float

median(field_path=None, filter_func=None)

Get median

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Median value

Return type:

int or float

meta(key=None)

Returns the meta data about this TimeSeries as a JSON object

Parameters:key (str, optional) – Optional metadata key to fetch value for
Returns:Return a thawed metadata dict or the value specified by key.
Return type:dict or key/value
min(field_path=None, filter_func=None)

Get min

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Min value

Return type:

int or float

monthly_rollup(aggregation, to_events=False, utc=False)

Builds a new TimeSeries by dividing events into months. The months are in either local or UTC time, depending on if utc(true) is set on the Pipeline.

Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:

{
    'in_avg': {'in': Functions.avg()},
    'out_avg': {'out': Functions.avg()},
    'in_max': {'in': Functions.max()},
    'out_max': {'out': Functions.max()},
}

will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each month, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.

Example:

timeseries = TimeSeries(data)
hourly_max_temp = timeseries.monthly_rollup(
    {'max_temp': {'temperature': Functions.max()}}
)

This helper function defaults to rendering the aggregations in localtime. The reason for this is that rendering in localtime makes the most sense on the client side - like rendering a timeseries chart. A user looking at a chart in UTC might not make much sense.

Since this is now being used in servers side applications, the optional arg utc can be set to True to force it to render in UTC instead.

Probably best to favor using .fixed_window_rollup() when wanting to render in UTC.

Parameters:
  • aggregation (dict) – The aggregation specification e.g. {‘max_temp’: {‘temperature’: Functions.max()}}
  • to_event (bool, optional) – Do conversion to Event objects
Returns:

The resulting rolled up TimeSeries.

Return type:

TimeSeries

name()

Get data name.

Returns:Data name.
Return type:str
percentile(perc, field_path, method='linear', filter_func=None)

Gets percentile perc within the Collection. Numpy under the hood.

Parameters:
  • perc (int) – The percentile (should be between 0 and 100)
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • method (str, optional) –

    Specifies the interpolation method to use when the desired percentile lies between two data points. Options are:

    linear: i + (j - i) * fraction, where fraction is the fractional part of the index surrounded by i and j.

    lower: i

    higher: j

    nearest: i or j whichever is nearest

    midpoint: (i + j) / 2

Returns:

The percentile.

Return type:

int or float

pipeline()

Returns a new Pipeline with input source being initialized to this TimeSeries collection. This allows pipeline operations to be chained directly onto the TimeSeries to produce a new TimeSeries or Event result.

Returns:New pipline.
Return type:Pipeline
quantile(num, field_path=None, method='linear')

Gets num quantiles within the Collection

Parameters:
  • num (Number of quantiles to divide the Collection into.) – Description
  • field_path (None, optional) – The field to return as the quantile. If not set, defaults to ‘value.’
  • method (str, optional) –

    Specifies the interpolation method to use when the desired percentile lies between two data points. Options are:

    linear: i + (j - i) * fraction, where fraction is the fractional part of the index surrounded by i and j.

    lower: i

    higher: j

    nearest: i or j whichever is nearest

    midpoint: (i + j) / 2

Returns:

An array of quantiles

Return type:

list

range()

Alias for timerange()

Returns:TimeRange internal of the underly collection.
Return type:TimeRange
rate(field_spec=None, allow_negative=True)

derive entry point

rename_columns(rename_map)

TimeSeries.map() helper function to rename columns in the underlying events.

Takes a dict of columns to rename:

new_ts = ts.rename_columns({'in': 'new_in', 'out': 'new_out'})

Returns a new time series containing new events. Columns not in the dict will be retained and not renamed.

NOTE: as the name implies, this will only rename the main “top level” (ie: non-deep) columns. If you need more extravagant renaming, roll your own using map().

Parameters:rename_map (dict) – Dict of columns to rename.
Returns:A clone of this TimeSeries with a new Collection generated by the map operation.
Return type:TimeSeries
static same(series1, series2)

Implements JS Object.is() - same values

Parameters:
Returns:

Do the two have the same values?

Return type:

bool

select(field_spec=None)

call select on the pipeline.

Parameters:field_spec (str, list, tuple, None, optional) –

Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.

If None, the default ‘value’ column will be used.

Returns:A clone of this TimeSeries with a new Collection generated by the select operation.
Return type:TimeSeries
set_collection(coll)

Sets a new underlying collection for this TimeSeries.

Parameters:coll (Collection) – New collection to assign to this TimeSeries
Returns:New TimeSeries with Collection coll
Return type:TimeSeries
set_meta(key, value)

Change the metadata of the TimeSeries

Parameters:
  • key (str) – The metadata key
  • value (obj) – The value
Returns:

A new TimeSeries with new metadata.

Return type:

TimeSeries

set_name(name)

Set name and generate a new TimeSeries

Parameters:name (str) – New name
Returns:Return a TimeSeries with a new name.
Return type:TimeSeries
size()

Number of rows in series.

Returns:Number in the series.
Return type:int
size_valid(field_path)

Returns the number of valid items in this collection.

Uses the fieldSpec to look up values in all events. It then counts the number that are considered valid, i.e. are not NaN, undefined or null.

Parameters:field_path (str, list, tuple, None, optional) –

Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

If field_path is None, then [‘value’] will be the default.

Returns:Number of valid <field_path> values in the events.
Return type:int
slice(begin, end)

Perform a slice of events within the TimeSeries, returns a new TimeSeries representing a portion of this TimeSeries from begin up to but not including end. Uses typical python [slice:syntax].

Parameters:
  • begin (int) – Slice begin index
  • end (int) – Slice end index
Returns:

New instance with sliced collection.

Return type:

TimeSeries

stdev(field_path=None, filter_func=None)

Get std dev

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Standard deviation

Return type:

int or float

sum(field_path=None, filter_func=None)

Get sum

Parameters:
  • field_path (str, list, tuple, None, optional) –

    Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’

    If field_path is None, then [‘value’] will be the default.

  • filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns:

Summed values

Return type:

int or float

timerange()

Returns the extents of the TimeSeries as a TimeRange..

Returns:TimeRange internal of the underly collection.
Return type:TimeRange
static timeseries_list_merge(data, series_list)

Merge a list of time series.

Parameters:
  • data (dict or pvector) – Metadata to set in new TimeSeries.
  • series_list (list) – List of TimeSeries instances.
Returns:

New TimeSeries from merge.

Return type:

TimeSeries

static timeseries_list_reduce(data, series_list, reducer, field_spec=None)

Reduces a list of TimeSeries objects using a reducer function. This works by taking each event in each TimeSeries and collecting them together based on timestamp. All events for a given time are then merged together using the reducer function to produce a new Event. Those Events are then collected together to form a new TimeSeries.

Parameters:
  • data (dict or pmap) – Metadata to set in new TimeSeries.
  • series_list (list) – List of TimeSeries objects.
  • reducer (function) – reducer function
  • field_spec (list, str, None, optional) –

    Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.

    Can be set to None if the reducer does not require a field spec.

Returns:

New time series containing the mapped events.

Return type:

TimeSeries

static timeseries_list_sum(data, series_list, field_spec)

Takes a list of TimeSeries and sums them together to form a new Timeseries.

const ts1 = new TimeSeries(weather1) const ts2 = new TimeSeries(weather2) const sum = TimeSeries.timeseries_list_sum({name: “sum”}, [ts1, ts2], [“temp”])

Parameters:
  • data (dict) – Metadata to set in new TimeSeries.
  • series_list (list) – List of TimeSeries objects
  • field_spec (list, str, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
Returns:

New time series with summed values.

Return type:

TimeSeries

to_json()

Returns the TimeSeries as a python dict.

This is actually like json.loads(s) - produces the actual vanilla data structure.

Returns:Dictionary of columns and points
Return type:dict
to_string()

Retruns the TimeSeries as a string, useful for serialization.

In JS land, this is synonymous with __str__ or __unicode__

Returns:String version of to_json() for transmission/etc.
Return type:str
yearly_rollup(aggregation, to_events=False, utc=False)

Builds a new TimeSeries by dividing events into years. The years are in either local or UTC time, depending on if utc(true) is set on the Pipeline.

Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:

{
    'in_avg': {'in': Functions.avg()},
    'out_avg': {'out': Functions.avg()},
    'in_max': {'in': Functions.max()},
    'out_max': {'out': Functions.max()},
}

will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each year, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.

Example:

timeseries = TimeSeries(data)
hourly_max_temp = timeseries.monthly_rollup(
    {'max_temp': {'temperature': Functions.max()}}
)

This helper function defaults to rendering the aggregations in localtime. The reason for this is that rendering in localtime makes the most sense on the client side - like rendering a timeseries chart. A user looking at a chart in UTC might not make much sense.

Since this is now being used in servers side applications, the optional arg utc can be set to True to force it to render in UTC instead.

Probably best to favor using .fixed_window_rollup() when wanting to render in UTC.

Parameters:
  • aggregation (dict) – The aggregation specification e.g. {‘max_temp’: {‘temperature’: Functions.max()}}
  • to_event (bool, optional) – Do conversion to Event objects
Returns:

The resulting rolled up TimeSeries.

Return type:

TimeSeries

pypond.timerange_event module

TimeRangeEvent associates data with a specific time range rather than at a discret time like Event does.

class pypond.timerange_event.TimeRangeEvent(instance_or_args, arg2=None)

Bases: pypond.event.EventBase

The creation of an TimeRangeEvent is done by combining two parts - the timerange and the data.

To construct you specify a TimeRange, along with the data.

The first arg can be:

  • a TimeRangeEvent instance (copy ctor)
  • a pyrsistent.PMap, or
  • a python tuple, list or pyrsistent.PVector object containing two python datetime objects or ms timestamps - the args for the TimeRange object.

To specify the data you can supply either:

  • a python dict
  • a pyrsistent.PMap, or
  • a simple type such as an integer. In the case of the simple type this is a shorthand for supplying {“value”: v}.
Parameters:
  • instance_or_args (TimeRange, iterable, pyrsistent.pmap) – See above
  • arg2 (dict, pmap, int, float, str, optional) – See above.
begin()

The begin time of this Event, which will be just the timestamp.

Returns:Beginning of range.
Return type:datetime.datetime
end()

The end time of this Event, which will be just the timestamp.

Returns:End of range.
Return type:datetime.datetime
humanize_duration()

Humanize the timerange.

Returns:Humanized string of the time range.
Return type:str
key()

Returns a range string in the format ‘begin,end’ as expressed as ms since the epoch.

Returns:The begin and end of the timerange in ms since the epoch.
Return type:str
set_data(data)

Sets the data portion of the event and returns a new TimeRangeEvent.

Parameters:
  • data (dict) – The new data portion for this event object.
  • data – New payload to set as the data for this event.
Returns:

TimeRangeEvent - a new TimeRangeEvent object.

Returns:

A new time range event object with new data payload.

Return type:

TimeRangeEvent

timerange()

The TimeRange of this data.

Returns:The underlying time range object.
Return type:TimeRange
timerange_as_local_string()

The timerange of this data, in Local time, as a string.

Returns:Formatted time string.
Return type:str
timerange_as_utc_string()

The timerange of this data, in UTC time, as a string.

Returns:Formatted time string
Return type:str
timestamp()

The timestamp of this Event data. It’s just the beginning of the range in this case.

Returns:Beginning of range.
Return type:datetime.datetime
to_json()
Returns the TimeRangeEvent as a JSON object, essentially
{timerange: tr, data: {key: value, ...}}

This is actually like json.loads(s) - produces the actual data structure from the object internal data.

Returns:Dict representation of internals (timerange, data).
Return type:dict
to_point(cols=None)

Returns a flat array starting with the timestamp, followed by the values.

Can be given an optional list of columns so the returned list will have the values in order. Primarily for the TimeSeries wire format.

Parameters:cols (list, optional) – List of data columns to order the data points in so the TimeSeries wire format lines up correctly. If not specified, the points will be whatever order that dict.values() decides to return it in.
Returns:Epoch ms followed by points.
Return type:list
type()

Return the type of this event type

Returns:The class of this event type.
Return type:class

pypond.util module

Various utilities for the pypond code. Primarily functions to take care of consistent handling and conversion of time values as we are trying to traffic in aware datetime objects in UTC time.

Additionally some boolean test functions and assorted other utility functions.

class pypond.util.Capsule(**kwargs)

Bases: pypond.util.Options

Straight subclass of Options so there is no confusion between this and the pipeline Options. Employing this to mimic the Javascript Object in cases where using a Python dict would cause confusion porting the code.

class pypond.util.ObjectEncoder(skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, encoding='utf-8', default=None)

Bases: json.encoder.JSONEncoder

Class to allow arbitrary python objects to be json encoded with json.dumps()/etc by defining a .to_json() method on your object.

We need this for encoding lists of custom Event (etc) objects.

Usage: json.dumps(your_cool_object, cls=ObjectEncoder)

default(obj)
class pypond.util.Options(**kwargs)

Bases: object

Encapsulation object for Pipeline options.

Example:

o = Options(foo='bar')

and

o = Options()
o.foo = 'bar'

Are identical.
Parameters:initial (dict, optional) – Can supply keyword args for initial values.
to_dict()
pypond.util.aware_dt_from_args(dtargs, localize=False)

generate an aware datetime object using datetime.datetime kwargs.

can generate a localized version as well, but please don’t.

Parameters:
  • dtargs (dict) – Dict containing the args you pass to datetime.datetime.
  • localize (bool, optional) – Will create a new object in localtime, but just don’t do it.
Returns:

New datetime object

Return type:

datetime.datetime

Raises:

UtilityException – Raised if the args are wrong type.

pypond.util.aware_utcnow()

return an aware utcnow() datetime rounded to milliseconds.

Returns:New datetime object
Return type:datetime.datetime
pypond.util.dt_from_ms(msec)

generate a datetime object from epoch milliseconds

Parameters:msec (int) – epoch milliseconds
Returns:New datetime object from ms
Return type:datetime.datetime
pypond.util.dt_is_aware(dtime)

see if a datetime object is aware

Parameters:dtime (datetime.datetime) – A datetime object
Returns:Returns True if the dtime is aware/non-naive.
Return type:bool
pypond.util.format_dt(dtime, localize=False)

Format for human readable output.

Parameters:
  • dtime (datetime.datetime) – A datetime object
  • localize (bool, optional) – Display as local time.
Returns:

Formatted date string.

Return type:

str

pypond.util.generate_paths(dic)

Generate a list of all possible field paths in a dict. This is for determining all paths in a dict when none is given.

Currently unused, but keeping since we will probably need it.

Parameters:dic (dict) – A dict, generally the payload from an Event class.
Returns:A list of strings of all the paths in the dict.
Return type:list
pypond.util.humanize_dt(dtime)

format time format display for humanize maneuvers.

Parameters:dtime (datetime.datetime) – A datetime object
Returns:Datetime formatted as a string.
Return type:str
pypond.util.humanize_dt_ago(dtime)

format to “23 minutes ago” style format.

Parameters:dtime (datetime.datetime) – A datetime object
Returns:Humanized string.
Return type:str
pypond.util.humanize_duration(delta)

format for a single duration value - takes datatime.timedelta as arg

Parameters:delta (datetime.timedelta) – A time delta
Returns:Humanize delta to duration.
Return type:str
pypond.util.is_function(func)

Test if a value is a function.

Parameters:func (obj) – A value
Returns:Is the object a python function?
Return type:bool
pypond.util.is_nan(val)

Test if a value is NaN

Parameters:val (obj) – A value
Returns:Is it NaN?
Return type:bool
pypond.util.is_pipeline(obj)

Test if something is a Pipeline object. This is put here with a deferred import statement to avoid circular imports so the I/O don’t need to import pipeline.py.

This probably does not need to be deferred but doing it for safety sake.

Parameters:obj (object) – An object to test to see if it’s a Pipeline.
Returns:True if Pipeline
Return type:bool
pypond.util.is_pmap(pmap)

Check this here so people don’t mistake pmap and PMap.

Parameters:pmap (obj) – An object
Returns:Returns True if it is a pyrsistent.pmap
Return type:bool
pypond.util.is_pvector(pvector)

Check this here so people don’t mistake PVector and pvector.

Parameters:pvector (obj) – An object
Returns:Returns True if it is a pyrsistent.pvector
Return type:bool
pypond.util.is_valid(val)

Test if a value is valid.

Parameters:val (obj) – A value
Returns:Is it valid?
Return type:bool
pypond.util.localtime_from_ms(msec)

generate an aware localtime datetime object from ms

Parameters:msec (int) – epoch milliseconds
Returns:New datetime object
Return type:datetime.datetime
pypond.util.localtime_info_from_utc(dtime)

Extract local TZ formatted values from an aware UTC datetime object. This is used by the index string methods when grouping data for local display.

Parameters:dtime (datetime.datetime) – An aware UTC datetime object
Returns:A dict with formatted elements (zero-padded months, etc) extracted from the local version.
Return type:dict
pypond.util.monthdelta(date, delta)

because we wish datetime.timedelta had a month kwarg.

Courtesy of: http://stackoverflow.com/a/3425124/3916180

Parameters:
  • date (datetime.date) – Date object
  • delta (int) – Month delta
Returns:

New Date object with delta offset.

Return type:

datetime.date

pypond.util.ms_from_dt(dtime)

Turn a datetime object into ms since epoch.

Parameters:dtime (datetime.datetime) – A datetime object
Returns:epoch milliseconds
Return type:int
pypond.util.nested_get(dic, keys)

Address a nested dict with a list of keys to fetch a value. This is functionaly similar to the standard functools.reduce() method employing dict.get, but this returns ‘bad_path’ if the path does not exist. This is because we need to differentiate between an existing value that is actually None vs. the dict.get() failover. Would have preferred to return False, but who knows if we’ll end up with data containing Boolean values.

sample_dict = dict()
nested_set(sample_dict, ['bar', 'baz'], 23)
nested_get(sample_dict, ['bar', 'quux'])
False

Unlike nested_set(), this will not create a new path branch if it does not already exist.

Parameters:
  • dic (dict) – The dict we are working with
  • keys (list) – A lsit of nested keys
Returns:

Whatever value was at the terminus of the keys.

Return type:

obj

pypond.util.nested_set(dic, keys, value)

Address a nested dict with a list of keys and set a value. If part of the path does not exist, it will be created.

sample_dict = dict()
nested_set(sample_dict, ['bar', 'baz'], 23)
{'bar': {'baz': 23}}
nested_set(sample_dict, ['bar', 'baz'], 25)
{'bar': {'baz': 25}}
Parameters:
  • dic (dict) – The dict we are workign with.
  • keys (list) – A list of nested keys
  • value (obj) – Whatever we want to set the ultimate key to.
pypond.util.sanitize_dt(dtime, testing=False)

Make sure the datetime object is in UTC/etc. Also round incoming datetime objects to milliseconds.

Allow disabling warnings when testing. Warning primarily exists to herd users into not passing in non-UTC tz datetime objects.

Parameters:
  • dtime (datetime.datetime) – A datetime object
  • testing (bool, optional) – Suppress warnings when testing.
Returns:

New datetime object rounded to ms from microseconds.

Return type:

datetime.datetime

pypond.util.to_milliseconds(dtime)

Check to see if a datetime object has granularity smaller than millisecond (ie: microseconds) and massage back to ms if so.

Doing this round-trip seems kludgy and inefficient, but doing this:

return dtime.replace(millisecond=round(dt.millisecond, -3))

produced inconsistent results because of the rounding and I’m not going to start treating numbers like strings.

Parameters:dtime (datetime.datetime) – A datetime object.
Returns:New datetime object rounded down to milliseconds from microseconds.
Return type:datetime.datetime
pypond.util.unique_id(prefix='')

generate a uuid with a prefix - for debugging. This probably isn’t truly random but it’s random enough. Calling uuid.uuid4() was imposing non-trivial drag on performance. The calls to /dev/urandom can block on certain unix-like systems.

Parameters:prefix (str, optional) – Prefix for uuid.
Returns:Prefixed uuid.
Return type:str

Module contents