Skip to content

Utils

Term dataclass

Represents a bucket in a terms aggregation result.

Source code in splitgill/utils.py
75
76
77
78
79
80
81
82
83
84
@dataclass
class Term:
    """
    Represents a bucket in a terms aggregation result.
    """

    # the field value
    value: Union[str, int, float, bool]
    # thue number of documents this value appeared in
    count: int

iter_terms(search, field, chunk_size=50, sample_probability=1.0, seed=None)

Yields Term objects, each representing a value and the number of documents which contain that value in the given field. The Terms are yielded in descending order of value frequency.

Parameters:

Name Type Description Default
search Search

a Search instance to use to run the aggregation

required
field str

the name of the field to get the terms for

required
chunk_size int

the number of buckets to retrieve per request

50
sample_probability float

the probability that a given record will be included in a random sample; set to 1 to use all records (default 1)

1.0
seed Optional[int]

sets the seed manually (if None or not set, defaults to current date timestamp / 3600)

None

Returns:

Type Description
Iterable[Term]

yields Term objects

Source code in splitgill/utils.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def iter_terms(
    search: Search,
    field: str,
    chunk_size: int = 50,
    sample_probability: float = 1.0,
    seed: Optional[int] = None,
) -> Iterable[Term]:
    """
    Yields Term objects, each representing a value and the number of documents which
    contain that value in the given field. The Terms are yielded in descending order of
    value frequency.

    :param search: a Search instance to use to run the aggregation
    :param field: the name of the field to get the terms for
    :param chunk_size: the number of buckets to retrieve per request
    :param sample_probability: the probability that a given record will be included in a
        random sample; set to 1 to use all records (default 1)
    :param seed: sets the seed manually (if None or not set, defaults to current date
        timestamp / 3600)
    :return: yields Term objects
    """
    after = None
    while True:
        # this has a dual purpose, it ensures we don't get any search results
        # when we don't need them, and it ensures we get a fresh copy of the
        # search to work with
        agg_search = search[:0]

        # this is the core aggregation
        composite_agg = A(
            'composite', size=chunk_size, sources={'value': A('terms', field=field)}
        )
        result_keys = ['values', 'buckets']
        after_keys = ['values', 'after_key']

        if sample_probability < 1:
            # this should stay relatively constant for caching purposes, but we can
            # change it once a day.
            # divide it by 3600 just to make it fit under the ES seed max (2147483647)
            # for longer - otherwise this stops working in 2038. The actual number isn't
            # important.
            seed = (
                seed
                if seed is not None
                else int(
                    datetime.now()
                    .replace(hour=0, minute=0, second=0, microsecond=0)
                    .timestamp()
                    / 3600
                )
            )
            # if we're sampling, the core agg gets nested underneath the sampler
            agg_search.aggs.bucket(
                'sampling', 'random_sampler', probability=sample_probability, seed=seed
            ).bucket('values', composite_agg)
            if after is not None:
                agg_search.aggs['sampling'].aggs['values'].after = after
            result_keys = ['sampling'] + result_keys
            after_keys = ['sampling'] + after_keys
        else:
            agg_search.aggs.bucket('values', composite_agg)
            if after is not None:
                agg_search.aggs['values'].after = after

        result = agg_search.execute().aggs.to_dict()

        buckets = get_in(result_keys, result, [])
        after = get_in(after_keys, result, None)
        if not buckets:
            break
        else:
            yield from (
                Term(bucket['key']['value'], bucket['doc_count']) for bucket in buckets
            )

now()

Get the current datetime as a timestamp.

Source code in splitgill/utils.py
53
54
55
56
57
def now() -> int:
    """
    Get the current datetime as a timestamp.
    """
    return int(time() * 1000)

parse_to_timestamp(datetime_string, datetime_format, tz=timezone.utc)

Parses the given string using the given format and returns a timestamp.

If the datetime object built from parsing the string with the given format doesn't contain a tzinfo component, then the tz parameter is added as a replacement value. This defaults to UTC.

Parameters:

Name Type Description Default
datetime_string str

the datetime as a string

required
datetime_format str

the format as a string

required
tz timezone

the timezone to use (default: UTC)

utc

Returns:

Type Description
int

the parsed datetime as the number of milliseconds since the UNIX epoch as an int

Source code in splitgill/utils.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def parse_to_timestamp(
    datetime_string: str, datetime_format: str, tz: timezone = timezone.utc
) -> int:
    """
    Parses the given string using the given format and returns a timestamp.

    If the datetime object built from parsing the string with the given format doesn't
    contain a tzinfo component, then the tz parameter is added as a replacement value.
    This defaults to UTC.

    :param datetime_string: the datetime as a string
    :param datetime_format: the format as a string
    :param tz: the timezone to use (default: UTC)
    :return: the parsed datetime as the number of milliseconds since the UNIX epoch as
        an int
    """
    dt = datetime.strptime(datetime_string, datetime_format)
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=tz)
    return to_timestamp(dt)

partition(iterable, size)

Partitions the given iterable into chunks. Each chunk yielded will be a list which is at most size in length. The final list yielded may be smaller if the length of the iterable isn't wholly divisible by the size.

Parameters:

Name Type Description Default
iterable Iterable

the iterable to partition

required
size int

the maximum size of list chunk to yield

required

Returns:

Type Description
Iterable[list]

yields lists

Source code in splitgill/utils.py
60
61
62
63
64
65
66
67
68
69
70
71
72
def partition(iterable: Iterable, size: int) -> Iterable[list]:
    """
    Partitions the given iterable into chunks. Each chunk yielded will be a list which
    is at most `size` in length. The final list yielded may be smaller if the length of
    the iterable isn't wholly divisible by the size.

    :param iterable: the iterable to partition
    :param size: the maximum size of list chunk to yield
    :return: yields lists
    """
    it = iter(iterable)
    while chunk := list(islice(it, size)):
        yield chunk

to_timestamp(moment)

Converts a datetime or date object into a timestamp value. The timestamp returned is an int. The timestamp value is the number of milliseconds that have elapsed between the UNIX epoch and the given moment. If the moment is a date, 00:00:00 on the day will be used.

Any precision greater than milliseconds held within the datetime is simply ignored and no rounding occurs.

Parameters:

Name Type Description Default
moment Union[datetime, date]

a datetime or date object

required

Returns:

Type Description
int

the timestamp (number of milliseconds between the UNIX epoch and the moment) as an int

Source code in splitgill/utils.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def to_timestamp(moment: Union[datetime, date]) -> int:
    """
    Converts a datetime or date object into a timestamp value. The timestamp returned is
    an int. The timestamp value is the number of milliseconds that have elapsed between
    the UNIX epoch and the given moment. If the moment is a date, 00:00:00 on the day
    will be used.

    Any precision greater than milliseconds held within the datetime is simply ignored
    and no rounding occurs.

    :param moment: a datetime or date object
    :return: the timestamp (number of milliseconds between the UNIX epoch and the
        moment) as an int
    """
    if isinstance(moment, datetime):
        return int(moment.timestamp() * 1000)
    else:
        return int(datetime(moment.year, moment.month, moment.day).timestamp() * 1000)