Skip to content

Manager

SearchVersion

Bases: Enum

Indicator for the SplitgillDatabase.search method as to which version of the data you would like to search.

Source code in splitgill/manager.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class SearchVersion(Enum):
    """
    Indicator for the SplitgillDatabase.search method as to which version of the data
    you would like to search.
    """

    # searches the latest data
    latest = 'latest'
    # searches all data
    all = 'all'

SplitgillClient

Splitgill client class which holds a mongo connection, an elasticsearch connection and any other general information Splitgill needs to manage the databases.

Source code in splitgill/manager.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class SplitgillClient:
    """
    Splitgill client class which holds a mongo connection, an elasticsearch connection
    and any other general information Splitgill needs to manage the databases.
    """

    def __init__(
        self,
        mongo: MongoClient,
        elasticsearch: Elasticsearch,
        mongo_database_name: str = 'sg',
    ):
        self.mongo = mongo
        self.elasticsearch = elasticsearch
        self.mongo_database_name = mongo_database_name
        self.lock_manager = LockManager(self.get_lock_collection())

    def get_database(self, name: str) -> 'SplitgillDatabase':
        """
        Returns a SplitgillDatabase object.

        :param name: the name of the database
        :return: a new SplitgillDatabase object
        """
        return SplitgillDatabase(name, self)

    def get_mongo_database(self) -> Database:
        """
        Returns the MongoDB database in use.

        :return: a pymongo Database object
        """
        return self.mongo.get_database(self.mongo_database_name)

    def get_options_collection(self) -> Collection:
        """
        Returns the options collection.

        :return: a pymongo Collection object
        """
        return self.get_mongo_database().get_collection(OPTIONS_COLLECTION_NAME)

    def get_data_collection(self, name: str) -> Collection:
        """
        Returns the data collection for the given Splitgill database.

        :param name: the name of the Splitgill database
        :return: a pymongo Collection object
        """
        return self.get_mongo_database().get_collection(f'data-{name}')

    def get_lock_collection(self) -> Collection:
        """
        Returns the locks collection.

        :return: a pymongo Collection object
        """
        return self.get_mongo_database().get_collection(LOCKS_COLLECTION_NAME)

get_data_collection(name)

Returns the data collection for the given Splitgill database.

Parameters:

Name Type Description Default
name str

the name of the Splitgill database

required

Returns:

Type Description
Collection

a pymongo Collection object

Source code in splitgill/manager.py
74
75
76
77
78
79
80
81
def get_data_collection(self, name: str) -> Collection:
    """
    Returns the data collection for the given Splitgill database.

    :param name: the name of the Splitgill database
    :return: a pymongo Collection object
    """
    return self.get_mongo_database().get_collection(f'data-{name}')

get_database(name)

Returns a SplitgillDatabase object.

Parameters:

Name Type Description Default
name str

the name of the database

required

Returns:

Type Description
SplitgillDatabase

a new SplitgillDatabase object

Source code in splitgill/manager.py
49
50
51
52
53
54
55
56
def get_database(self, name: str) -> 'SplitgillDatabase':
    """
    Returns a SplitgillDatabase object.

    :param name: the name of the database
    :return: a new SplitgillDatabase object
    """
    return SplitgillDatabase(name, self)

get_lock_collection()

Returns the locks collection.

Returns:

Type Description
Collection

a pymongo Collection object

Source code in splitgill/manager.py
83
84
85
86
87
88
89
def get_lock_collection(self) -> Collection:
    """
    Returns the locks collection.

    :return: a pymongo Collection object
    """
    return self.get_mongo_database().get_collection(LOCKS_COLLECTION_NAME)

get_mongo_database()

Returns the MongoDB database in use.

Returns:

Type Description
Database

a pymongo Database object

Source code in splitgill/manager.py
58
59
60
61
62
63
64
def get_mongo_database(self) -> Database:
    """
    Returns the MongoDB database in use.

    :return: a pymongo Database object
    """
    return self.mongo.get_database(self.mongo_database_name)

get_options_collection()

Returns the options collection.

Returns:

Type Description
Collection

a pymongo Collection object

Source code in splitgill/manager.py
66
67
68
69
70
71
72
def get_options_collection(self) -> Collection:
    """
    Returns the options collection.

    :return: a pymongo Collection object
    """
    return self.get_mongo_database().get_collection(OPTIONS_COLLECTION_NAME)

SplitgillDatabase

Represents a single set of data to be managed by Splitgill.

Under the hood, this data will exist in several MongoDB collections and Elasticsearch indices, but this object provides an abstraction layer to access all of that from one object.

Source code in splitgill/manager.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
class SplitgillDatabase:
    """
    Represents a single set of data to be managed by Splitgill.

    Under the hood, this data will exist in several MongoDB collections and
    Elasticsearch indices, but this object provides an abstraction layer to access all
    of that from one object.
    """

    def __init__(self, name: str, client: SplitgillClient):
        """
        :param name: the name of the database, needs to be a valid MongoDB collection
                     name and a valid Elasticsearch index name
        :param client: a SplitgillClient object
        """
        self.name = name
        self._client = client
        self.data_collection = self._client.get_data_collection(self.name)
        self.options_collection = self._client.get_options_collection()
        self.indices = IndexNames(self.name)
        self.locker = self._client.lock_manager

    def get_current_indices(self) -> List[str]:
        """
        Returns a list of the indices currently in use by this database in Elasticsearch
        sorted in ascending alphabetical order.

        :return: a list of index names
        """
        res = self._client.elasticsearch.indices.get_alias(index=self.indices.wildcard)
        return sorted(res.keys())

    def get_arc_status(self) -> ArcStatus:
        """
        Get the status of the latest archive index in use by this database in
        Elasticsearch. Old versions of records are stored in the archive (arc) indices
        using an append strategy. This starts with arc-0 and after this arc is filled
        moves on to arc-1 and so on. This method returns the index of the most recently
        used arc (i.e. the arc with the highest index number) and the number of
        documents in it.

        :return: an ArcStatus object
        """
        # get all arc index names for this database
        result = self._client.elasticsearch.indices.get_alias(
            index=self.indices.arc_wildcard
        )
        if result:
            latest_arc_index = max(
                int(arc_index_name.split('-')[-1]) for arc_index_name in result.keys()
            )
            count_result = self._client.elasticsearch.count(
                index=self.indices.get_arc(latest_arc_index)
            )
            return ArcStatus(latest_arc_index, count_result['count'])
        else:
            return ArcStatus(0, 0)

    def get_committed_version(self) -> Optional[int]:
        """
        Returns the latest committed version of the data or config (whichever is
        higher). If no records or options exist, or if neither has any committed values,
        None is returned.

        :return: the max version or None
        """
        sort = [('version', DESCENDING)]
        last_data = self.data_collection.find_one(sort=sort)
        last_options = self.options_collection.find_one({'name': self.name}, sort=sort)

        last_data_version = last_data.get('version') if last_data is not None else None
        last_options_version = (
            last_options.get('version') if last_options is not None else None
        )
        # there's no committed data or options
        if last_data_version is None and last_options_version is None:
            return None

        return max(
            last
            for last in (last_data_version, last_options_version)
            if last is not None
        )

    def get_elasticsearch_version(self) -> Optional[int]:
        """
        Returns the latest version found in the Elasticsearch indices for this database.
        If no records exist in any index, None is returned. This method checks both the
        maximum value in the version field and the next field. Checking the next field
        accounts for updates that only include deletions.

        :return: the max version or None
        """
        version = None
        for field in (DocumentField.VERSION, DocumentField.NEXT):
            result = self._client.elasticsearch.search(
                aggs={'max_version': {'max': {'field': field}}},
                size=0,
                # search all indices so that we catch deletes which won't have a
                # document in latest
                index=self.indices.wildcard,
            )
            value = get_in(('aggregations', 'max_version', 'value'), result, None)
            if value is not None and (version is None or value > version):
                version = value

        # elasticsearch does max aggs using the double type apparently, so we need to
        # convert it back to an int to avoid returning a float and causing confusion
        return int(version) if version is not None else None

    def get_rounded_version(self, version: int) -> Optional[int]:
        """
        Given a target version, rounds the version down to the nearest available
        version. This in effect returns the version of the data that is application to
        the given target version.

        If the target version is below the earliest version or this database's indexed
        data, or, no indexed versions are available, None is returned.

        :param version: the target version
        :return: a version or None
        """
        versions = self.get_versions()
        if not versions or version < versions[0]:
            return None

        return versions[bisect_right(versions, version) - 1]

    def has_data(self) -> bool:
        """
        Returns True if there is at least one committed record in this database,
        otherwise returns False. Note that this ignored options.

        :return: True if there is data, False if not
        """
        return self.data_collection.find_one({'version': {'$ne': None}}) is not None

    def has_options(self) -> bool:
        """
        Returns True if there is at least one committed options in this database,
        otherwise returns False. Note that this ignored data.

        :return: True if there is options, False if not
        """
        return self.options_collection.find_one({'version': {'$ne': None}}) is not None

    def commit(self) -> Optional[int]:
        """
        Commits the currently uncommitted data and options changes for this database.
        All new data/options will be given the same version which is the current time.
        If no changes were made, None is returned, otherwise the new version is
        returned.

        If a commit is already ongoing this will raise an AlreadyLocked exception.

        :return: the new version or None if no uncommitted changes were found
        """
        # todo: global now?
        # todo: transaction/rollback? Can't do this without replicasets so who knows?
        with self.locker.lock(self.name, stage='commit'):
            if not self.has_uncommitted_data() and not self.has_uncommitted_options():
                # nothing to commit, so nothing to do
                return None

            if not self.has_options() and not self.has_uncommitted_options():
                # no existing options and no options to be committed, so create some
                # basic parsing options to use
                options = ParsingOptionsBuilder().build()
                self.update_options(options, commit=False)

            version = now()

            # update the uncommitted data and options in a transaction
            for collection in [self.data_collection, self.options_collection]:
                collection.update_many(
                    filter={'version': None}, update={'$set': {'version': version}}
                )
            return version

    def ingest(
        self,
        records: Iterable[Record],
        commit=True,
        modified_field: Optional[str] = None,
    ) -> IngestResult:
        """
        Ingests the given records to the database. This only adds the records to the
        MongoDB data collection, it doesn't trigger the indexing of this new data into
        the Elasticsearch cluster. All data will be added with a None version unless the
        commit parameter is True in which case a version will be assigned.

        Use the commit keyword argument to either close the "transaction" after writing
        these records or leave it open. By default, the "transaction" is committed
        before the method returns, and the version is set then.

        If an error occurs, the "transaction" will not be committed, but the changes
        will not be rolled back.

        :param records: the records to add. These will be added in batches, so it is
            safe to pass a very large stream of records
        :param commit: whether to commit the data added with a new version after writing
            the records. Default: True.
        :param modified_field: a field name which, if the only changes in the record
            data are in this field means the changes will be ignored. As you can
            probably guess from the name, the root reason for this parameter existing is
            to avoid committing a new version of a record when all that has happened is
            the record has been touched and the modified field's date value updated even
            though the rest of the record remains the same. Default: None, meaning all
            fields are checked for changes.
        :return: returns a IngestResult object
        """
        # this does nothing if the indexes already exist
        self.data_collection.create_indexes(
            [IndexModel([('id', ASCENDING)]), IndexModel([('version', DESCENDING)])]
        )

        result = IngestResult()
        # this is used for the find size and the bulk ops partition size which both need
        # to be the same to ensure we can handle duplicates in the record stream
        size = 200

        for ops in partition(
            generate_ops(self.data_collection, records, modified_field, size), size
        ):
            bulk_result = self.data_collection.bulk_write(ops)
            result.update(bulk_result)

        if commit:
            result.version = self.commit()

        return result

    def update_options(self, options: ParsingOptions, commit=True) -> Optional[int]:
        """
        Update the parsing options for this database.

        :param options: the new parsing options
        :param commit: whether to commit the new config added with a new version after
            writing the config. Default: True.
        :return: returns the new version if a commit happened, otherwise None. If a
            commit was requested but nothing was changed, None is returned.
        """
        # get the latest options that have been committed (get_options ignores
        # uncommitted options)
        all_options = self.get_options()
        if all_options:
            latest_options = all_options[max(all_options)]
        else:
            latest_options = None

        if self.has_uncommitted_options():
            self.rollback_options()

        # if the options are the same as the latest existing ones, don't update
        if latest_options == options:
            return None

        # either the options are completely new or they differ from the existing
        # options, write a fresh entry
        new_doc = {
            'name': self.name,
            # version = None to indicate this is an uncommitted change
            'version': None,
            'options': options.to_doc(),
        }
        self.options_collection.insert_one(new_doc)

        if commit:
            return self.commit()
        return None

    def rollback_options(self) -> int:
        """
        Remove any uncommitted option changes.

        There should only ever be one, but this deletes them all ensuring everything is
        clean and tidy.

        :return: the number of documents deleted
        """
        return self.options_collection.delete_many({'version': None}).deleted_count

    def rollback_records(self):
        """
        Remove any uncommitted data changes.

        This method has to interrogate every uncommitted record in the data collection
        to perform the rollback and therefore, depending on how much uncommitted data
        there is, may take a bit of time to run.
        """
        if self.has_uncommitted_data():
            for ops in partition(generate_rollback_ops(self.data_collection), 200):
                self.data_collection.bulk_write(ops)

    def has_uncommitted_data(self) -> bool:
        """
        Check if there are any uncommitted records stored against this database.

        :return: returns True if there are any uncommitted records, False if not
        """
        return self.data_collection.find_one({'version': None}) is not None

    def has_uncommitted_options(self) -> bool:
        """
        Check if there are any uncommitted options stored against this database.

        :return: returns True if there are any uncommitted options, False if not
        """
        return self.options_collection.find_one({'version': None}) is not None

    def get_options(self, include_uncommitted=False) -> Dict[int, ParsingOptions]:
        """
        Retrieve all the parsing options configured for this database in a dict mapping
        int versions to ParsingOptions objects. Use the include_uncommitted parameter to
        indicate whether to include the uncommitted options or not.

        :return: a dict of versions and options
        """
        return {
            doc['version']: ParsingOptions.from_doc(doc['options'])
            for doc in self.options_collection.find({'name': self.name})
            if include_uncommitted or doc['version'] is not None
        }

    def iter_records(self, **find_kwargs) -> Iterable[MongoRecord]:
        """
        Yields MongoRecord objects matching the given find kwargs. As you can probably
        guess, the find_kwargs argument is just passed directly to PyMongo's find
        method.

        :param find_kwargs: args to pass to the data collection's find method
        :return: yields matching MongoRecord objects
        """
        yield from (
            MongoRecord(**doc) for doc in self.data_collection.find(**find_kwargs)
        )

    def sync(
        self, bulk_options: Optional[BulkOptions] = None, resync: bool = False
    ) -> WriteResult:
        """
        Synchronise the data/options in MongoDB with the data in Elasticsearch by
        updating the latest and old data indices as required.

        To find the data that needs to be updated, the current version of the data in
        MongoDB is compared to the current version of the data in Elasticsearch, and the
        two are synced (assuming MongoDB's version is <= Elasticsearch).

        While the data is being indexed, refreshing is paused on this database's indexes
        and only resumed once all the data has been indexed. If an error occurs during
        indexing then the refresh interval is not reset meaning the updates that have
        made it to Elasticsearch will not impact searches until a refresh is triggered,
        or the refresh interval is reset. This kinda makes this function transactional.
        If no errors occur, the refresh interval is reset (along with the replica count)
        and a refresh is called. This means that if this function returns successfully,
        the data updated by it will be immediately available for searches.

        Note that if resync=True, the arc indices will be deleted before reindexing
        begins! The latest index is not deleted as the documents in the latest index
        have IDs so can be directly replaced.

        :param bulk_options: options determining how the bulk operations are sent to
            Elasticsearch
        :param resync: whether to resync all records with Elasticsearch regardless of
            the currently synced version. This won't delete any data in the latest index
            as those records are replaced during resync, but any arc indices will be
            deleted prior to re-indexing.
        :return: a WriteResult object
        """
        if not self.has_data():
            return WriteResult()

        all_options = self.get_options(include_uncommitted=False)
        last_sync = self.get_elasticsearch_version() if not resync else None
        if last_sync is None:
            # elasticsearch has nothing so find all committed records
            find_filter = {'version': {'$ne': None}}
        else:
            committed_version = self.get_committed_version()
            if last_sync >= committed_version:
                # elasticsearch and mongo are in sync (use >= rather than == just in
                # case some bizarro-ness has occurred)
                return WriteResult()
            if any(version > last_sync for version in all_options):
                # there's an options change ahead, this means we need to check all
                # records again, so filter out committed records only
                find_filter = {'version': {'$ne': None}}
            else:
                # find all the updated records that haven't had their updates synced yet
                find_filter = {'version': {'$gt': last_sync}}

        if resync:
            # delete all arcs
            res = self._client.elasticsearch.indices.get_alias(
                index=self.indices.arc_wildcard
            )
            for arc_index in res.keys():
                self._client.elasticsearch.indices.delete(index=arc_index)

        create_templates(self._client.elasticsearch)

        return write_ops(
            self._client.elasticsearch,
            generate_index_ops(
                self.indices,
                self.get_arc_status(),
                self.iter_records(filter=find_filter),
                all_options,
                last_sync,
            ),
            bulk_options,
        )

    def resync_arcs(self, bulk_options: Optional[BulkOptions] = None) -> WriteResult:
        """
        Resynchronises all the arc indices. The arc indices are deleted first and then
        resynced. The latest index isn't touched.

        :param bulk_options: options determining how the bulk operations are sent to
            Elasticsearch
        :return: a WriteResult object
        """
        # delete all arcs
        res = self._client.elasticsearch.indices.get_alias(
            index=self.indices.arc_wildcard
        )
        for arc_index in res.keys():
            self._client.elasticsearch.indices.delete(index=arc_index)

        create_templates(self._client.elasticsearch)

        # cache this so that we don't have to look it up for every op
        latest_index_name = self.indices.latest

        return write_ops(
            self._client.elasticsearch,
            # filter out ops on the latest index so that only the arc indices are
            # written to. This will filter out both index and delete ops
            (
                op
                for op in generate_index_ops(
                    self.indices,
                    # this will always return the default ArcStatus cause we just
                    # deleted them all
                    self.get_arc_status(),
                    # get all committed records
                    self.iter_records(filter={'version': {'$ne': None}}),
                    # get all committed options
                    self.get_options(include_uncommitted=False),
                    None,
                )
                if op.index != latest_index_name
            ),
            bulk_options,
        )

    def search(
        self, version: Union[SearchVersion, int] = SearchVersion.latest
    ) -> Search:
        """
        Creates a Search DSL object to use on this database's indexed data. This Search
        object will be setup with the appropriate index and version filter depending on
        the given version parameter, and the Elasticsearch client object in use on this
        database.

        If a version number is passed as the version parameter, it will be checked
        against the latest version available in Elasticsearch. If it is below the latest
        version available in Elasticsearch, all indices will be searched and a term
        filter will be used to get the right data. If the version is equal to or above
        the latest version available in Elasticsearch, the latest index will be searched
        and no version term filter will be used. This is for Elasticsearch performance
        and caching.

        If version is not an int but is instead a SearchVersion, this is used to set the
        index on the search object. SearchVersion.latest sets the index to this
        database's latest index, whereas SearchVersion.all sets the index to a wildcard
        to search on all indices used by this database.

        :param version: the version to search at, this should either be a SearchVersion
            enum option or an int version.
        :return: a Search DSL object
        """
        search = Search(using=self._client.elasticsearch)

        if isinstance(version, int):
            current_version = self.get_elasticsearch_version()
            if current_version is not None and current_version <= version:
                # the version requested is above the latest version, use the latest
                # index instead of a filter, it'll be faster and more easily cachable
                # for elasticsearch
                search = search.index(self.indices.latest)
            else:
                search = search.index(self.indices.wildcard)
                search = search.filter(version_query(version))
        else:
            if version == SearchVersion.latest:
                search = search.index(self.indices.latest)
            elif version == SearchVersion.all:
                search = search.index(self.indices.wildcard)

        return search

    def get_version_changed_counts(self) -> Dict[int, int]:
        """
        Retrieves the available versions and the number of documents that have changed
        in the database with that version. Changes can be additions, modifications, or
        deletions.

        :return: versions and associated change counts
        """
        search = self.search(version=SearchVersion.all)

        # aggregate over the version field first
        versions = {
            term.value: term.count for term in iter_terms(search, DocumentField.VERSION)
        }

        # then aggregate over the next field to catch deleted records and add those
        versions.update(
            {term.value: term.count for term in iter_terms(search, DocumentField.NEXT)}
        )

        return versions

    def get_versions(self) -> List[int]:
        """
        Returns a list of the available versions that have been indexed into
        Elasticsearch for this database. The versions are in ascending order and will be
        retrieved from both the version and next document fields to ensure we capture
        all versions.

        :return: the available versions in ascending order
        """
        return sorted(self.get_version_changed_counts().keys())

    def get_data_fields(
        self,
        version: Optional[int] = None,
        query: Optional[Query] = None,
        **iter_terms_kwargs,
    ) -> List[DataField]:
        """
        Retrieves the available data fields for this database, optionally at the given
        version with the given query.

        :param version: the version to find data fields at, if None, the latest data is
            searched
        :param query: the query to filter records with before finding the data fields,
            if None, all record data is considered
        :param iter_terms_kwargs: kwargs passed directly to iter_terms (e.g. chunk_size,
            sample_probability)
        :return: a list of DataField objects with the most frequent field first
        """
        search = self.search(version if version is not None else SearchVersion.latest)
        if query is not None:
            search = search.filter(query)

        fields: Dict[str, DataField] = {}

        # create the basic field objects and add type counts
        for term in iter_terms(
            search,
            DocumentField.DATA_TYPES,
            **{
                k: v
                for k, v in iter_terms_kwargs.items()
                if k not in ['search', 'field']
            },
        ):
            path, raw_types = term.value.rsplit('.', 1)
            if path not in fields:
                fields[path] = DataField(path)
            fields[path].add(raw_types, term.count)

        # go through each field and link it with other fields to create hierarchy
        for field in fields.values():
            if not field.is_container:
                continue
            target_dot_count = field.path.count('.') + 1
            for child in fields.values():
                if child.path.count('.') == target_dot_count and child.path.startswith(
                    f'{field.path}.'
                ):
                    field.children.append(child)
                    child.parent = field

        # return the data fields as a list in a specific order. Note that because we're
        # using multiple orderings in different directions, we do multiple sorts in the
        # reverse order of the order we want them applied.
        # descending depth (so fields closest to the root first)
        data_fields = sorted(
            fields.values(), key=lambda f: f.path.count('.'), reverse=True
        )
        # ascending alphabetical order
        data_fields.sort(key=lambda f: f.path)
        # descending frequency (so most frequent fields first)
        data_fields.sort(key=lambda f: f.count, reverse=True)
        return data_fields

    def get_parsed_fields(
        self,
        version: Optional[int] = None,
        query: Optional[Query] = None,
        **iter_terms_kwargs,
    ) -> List[ParsedField]:
        """
        Retrieves the available parsed fields for this database, optionally at the given
        version with the given query.

        :param version: the version to find parsed fields at, if None, the latest data
            is searched
        :param query: the query to filter records with before finding the parsed fields,
            if None, all record data is considered
        :param iter_terms_kwargs: kwargs passed directly to iter_terms (e.g. chunk_size,
            sample_probability)
        :return: a list of ParsedField objects with the most frequent field first
        """
        search = self.search(version if version is not None else SearchVersion.latest)
        if query is not None:
            search = search.filter(query)

        fields: Dict[str, ParsedField] = {}

        # create the basic field objects and add type counts
        for term in iter_terms(
            search,
            DocumentField.PARSED_TYPES,
            **{
                k: v
                for k, v in iter_terms_kwargs.items()
                if k not in ['search', 'field']
            },
        ):
            path, raw_types = term.value.rsplit('.', 1)
            if path not in fields:
                fields[path] = ParsedField(path)
            fields[path].add(raw_types, term.count)

        # return the parsed fields as a list in a specific order. Note that because
        # we're using multiple orderings in different directions, we do multiple sorts
        # in the reverse order of the order we want them applied.
        # descending depth (so fields closest to the root first)
        parsed_fields = sorted(
            fields.values(), key=lambda f: f.path.count('.'), reverse=True
        )
        # ascending alphabetical order
        parsed_fields.sort(key=lambda f: f.path)
        # descending frequency (so most frequent fields first)
        parsed_fields.sort(key=lambda f: f.count, reverse=True)
        return parsed_fields

    def get_field_names(self) -> List[str]:
        """
        Retrieves a list of field names from the latest index mapping.

        Does not take any version or query parameters; simply returns all the "data."
        fields available on the index, along with their available types. All relevant
        type counts are set to 1 to enable use of e.g. .is_text(). Use get_data_fields
        or get_parsed_fields if you need accurate counts, or to filter by version or
        query.
        """
        latest_index = Index(self.indices.latest, using=self._client.elasticsearch)
        mapping = latest_index.get_mapping()
        parsed_fields = {}
        top_level = get_in(
            [self.indices.latest, 'mappings', 'properties', 'data', 'properties'],
            mapping.body,
            default={},
        )

        def _extract_fields(field_name, field_props, parents=None):
            parents = [p for p in (parents or []) if p is not None]
            if 'properties' in field_props:
                _extract_fields(field_name, field_props['properties'], parents)
            else:
                for k, v in field_props.items():
                    if k.startswith('_') and 'type' in v:
                        field_path = '.'.join(parents + [field_name])
                        parsed_field = parsed_fields.get(
                            field_path, ParsedField(field_path)
                        )
                        parsed_field.add(k, 1)
                        parsed_fields[field_path] = parsed_field
                    else:
                        _extract_fields(k, v['properties'], parents + [field_name])

        _extract_fields(None, top_level)

        return list(parsed_fields.values())

__init__(name, client)

Parameters:

Name Type Description Default
name str

the name of the database, needs to be a valid MongoDB collection name and a valid Elasticsearch index name

required
client SplitgillClient

a SplitgillClient object

required
Source code in splitgill/manager.py
113
114
115
116
117
118
119
120
121
122
123
124
def __init__(self, name: str, client: SplitgillClient):
    """
    :param name: the name of the database, needs to be a valid MongoDB collection
                 name and a valid Elasticsearch index name
    :param client: a SplitgillClient object
    """
    self.name = name
    self._client = client
    self.data_collection = self._client.get_data_collection(self.name)
    self.options_collection = self._client.get_options_collection()
    self.indices = IndexNames(self.name)
    self.locker = self._client.lock_manager

commit()

Commits the currently uncommitted data and options changes for this database. All new data/options will be given the same version which is the current time. If no changes were made, None is returned, otherwise the new version is returned.

If a commit is already ongoing this will raise an AlreadyLocked exception.

Returns:

Type Description
Optional[int]

the new version or None if no uncommitted changes were found

Source code in splitgill/manager.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def commit(self) -> Optional[int]:
    """
    Commits the currently uncommitted data and options changes for this database.
    All new data/options will be given the same version which is the current time.
    If no changes were made, None is returned, otherwise the new version is
    returned.

    If a commit is already ongoing this will raise an AlreadyLocked exception.

    :return: the new version or None if no uncommitted changes were found
    """
    # todo: global now?
    # todo: transaction/rollback? Can't do this without replicasets so who knows?
    with self.locker.lock(self.name, stage='commit'):
        if not self.has_uncommitted_data() and not self.has_uncommitted_options():
            # nothing to commit, so nothing to do
            return None

        if not self.has_options() and not self.has_uncommitted_options():
            # no existing options and no options to be committed, so create some
            # basic parsing options to use
            options = ParsingOptionsBuilder().build()
            self.update_options(options, commit=False)

        version = now()

        # update the uncommitted data and options in a transaction
        for collection in [self.data_collection, self.options_collection]:
            collection.update_many(
                filter={'version': None}, update={'$set': {'version': version}}
            )
        return version

get_arc_status()

Get the status of the latest archive index in use by this database in Elasticsearch. Old versions of records are stored in the archive (arc) indices using an append strategy. This starts with arc-0 and after this arc is filled moves on to arc-1 and so on. This method returns the index of the most recently used arc (i.e. the arc with the highest index number) and the number of documents in it.

Returns:

Type Description
ArcStatus

an ArcStatus object

Source code in splitgill/manager.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def get_arc_status(self) -> ArcStatus:
    """
    Get the status of the latest archive index in use by this database in
    Elasticsearch. Old versions of records are stored in the archive (arc) indices
    using an append strategy. This starts with arc-0 and after this arc is filled
    moves on to arc-1 and so on. This method returns the index of the most recently
    used arc (i.e. the arc with the highest index number) and the number of
    documents in it.

    :return: an ArcStatus object
    """
    # get all arc index names for this database
    result = self._client.elasticsearch.indices.get_alias(
        index=self.indices.arc_wildcard
    )
    if result:
        latest_arc_index = max(
            int(arc_index_name.split('-')[-1]) for arc_index_name in result.keys()
        )
        count_result = self._client.elasticsearch.count(
            index=self.indices.get_arc(latest_arc_index)
        )
        return ArcStatus(latest_arc_index, count_result['count'])
    else:
        return ArcStatus(0, 0)

get_committed_version()

Returns the latest committed version of the data or config (whichever is higher). If no records or options exist, or if neither has any committed values, None is returned.

Returns:

Type Description
Optional[int]

the max version or None

Source code in splitgill/manager.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def get_committed_version(self) -> Optional[int]:
    """
    Returns the latest committed version of the data or config (whichever is
    higher). If no records or options exist, or if neither has any committed values,
    None is returned.

    :return: the max version or None
    """
    sort = [('version', DESCENDING)]
    last_data = self.data_collection.find_one(sort=sort)
    last_options = self.options_collection.find_one({'name': self.name}, sort=sort)

    last_data_version = last_data.get('version') if last_data is not None else None
    last_options_version = (
        last_options.get('version') if last_options is not None else None
    )
    # there's no committed data or options
    if last_data_version is None and last_options_version is None:
        return None

    return max(
        last
        for last in (last_data_version, last_options_version)
        if last is not None
    )

get_current_indices()

Returns a list of the indices currently in use by this database in Elasticsearch sorted in ascending alphabetical order.

Returns:

Type Description
List[str]

a list of index names

Source code in splitgill/manager.py
126
127
128
129
130
131
132
133
134
def get_current_indices(self) -> List[str]:
    """
    Returns a list of the indices currently in use by this database in Elasticsearch
    sorted in ascending alphabetical order.

    :return: a list of index names
    """
    res = self._client.elasticsearch.indices.get_alias(index=self.indices.wildcard)
    return sorted(res.keys())

get_data_fields(version=None, query=None, **iter_terms_kwargs)

Retrieves the available data fields for this database, optionally at the given version with the given query.

Parameters:

Name Type Description Default
version Optional[int]

the version to find data fields at, if None, the latest data is searched

None
query Optional[Query]

the query to filter records with before finding the data fields, if None, all record data is considered

None
iter_terms_kwargs

kwargs passed directly to iter_terms (e.g. chunk_size, sample_probability)

{}

Returns:

Type Description
List[DataField]

a list of DataField objects with the most frequent field first

Source code in splitgill/manager.py
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
def get_data_fields(
    self,
    version: Optional[int] = None,
    query: Optional[Query] = None,
    **iter_terms_kwargs,
) -> List[DataField]:
    """
    Retrieves the available data fields for this database, optionally at the given
    version with the given query.

    :param version: the version to find data fields at, if None, the latest data is
        searched
    :param query: the query to filter records with before finding the data fields,
        if None, all record data is considered
    :param iter_terms_kwargs: kwargs passed directly to iter_terms (e.g. chunk_size,
        sample_probability)
    :return: a list of DataField objects with the most frequent field first
    """
    search = self.search(version if version is not None else SearchVersion.latest)
    if query is not None:
        search = search.filter(query)

    fields: Dict[str, DataField] = {}

    # create the basic field objects and add type counts
    for term in iter_terms(
        search,
        DocumentField.DATA_TYPES,
        **{
            k: v
            for k, v in iter_terms_kwargs.items()
            if k not in ['search', 'field']
        },
    ):
        path, raw_types = term.value.rsplit('.', 1)
        if path not in fields:
            fields[path] = DataField(path)
        fields[path].add(raw_types, term.count)

    # go through each field and link it with other fields to create hierarchy
    for field in fields.values():
        if not field.is_container:
            continue
        target_dot_count = field.path.count('.') + 1
        for child in fields.values():
            if child.path.count('.') == target_dot_count and child.path.startswith(
                f'{field.path}.'
            ):
                field.children.append(child)
                child.parent = field

    # return the data fields as a list in a specific order. Note that because we're
    # using multiple orderings in different directions, we do multiple sorts in the
    # reverse order of the order we want them applied.
    # descending depth (so fields closest to the root first)
    data_fields = sorted(
        fields.values(), key=lambda f: f.path.count('.'), reverse=True
    )
    # ascending alphabetical order
    data_fields.sort(key=lambda f: f.path)
    # descending frequency (so most frequent fields first)
    data_fields.sort(key=lambda f: f.count, reverse=True)
    return data_fields

get_elasticsearch_version()

Returns the latest version found in the Elasticsearch indices for this database. If no records exist in any index, None is returned. This method checks both the maximum value in the version field and the next field. Checking the next field accounts for updates that only include deletions.

Returns:

Type Description
Optional[int]

the max version or None

Source code in splitgill/manager.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def get_elasticsearch_version(self) -> Optional[int]:
    """
    Returns the latest version found in the Elasticsearch indices for this database.
    If no records exist in any index, None is returned. This method checks both the
    maximum value in the version field and the next field. Checking the next field
    accounts for updates that only include deletions.

    :return: the max version or None
    """
    version = None
    for field in (DocumentField.VERSION, DocumentField.NEXT):
        result = self._client.elasticsearch.search(
            aggs={'max_version': {'max': {'field': field}}},
            size=0,
            # search all indices so that we catch deletes which won't have a
            # document in latest
            index=self.indices.wildcard,
        )
        value = get_in(('aggregations', 'max_version', 'value'), result, None)
        if value is not None and (version is None or value > version):
            version = value

    # elasticsearch does max aggs using the double type apparently, so we need to
    # convert it back to an int to avoid returning a float and causing confusion
    return int(version) if version is not None else None

get_field_names()

Retrieves a list of field names from the latest index mapping.

Does not take any version or query parameters; simply returns all the "data." fields available on the index, along with their available types. All relevant type counts are set to 1 to enable use of e.g. .is_text(). Use get_data_fields or get_parsed_fields if you need accurate counts, or to filter by version or query.

Source code in splitgill/manager.py
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
def get_field_names(self) -> List[str]:
    """
    Retrieves a list of field names from the latest index mapping.

    Does not take any version or query parameters; simply returns all the "data."
    fields available on the index, along with their available types. All relevant
    type counts are set to 1 to enable use of e.g. .is_text(). Use get_data_fields
    or get_parsed_fields if you need accurate counts, or to filter by version or
    query.
    """
    latest_index = Index(self.indices.latest, using=self._client.elasticsearch)
    mapping = latest_index.get_mapping()
    parsed_fields = {}
    top_level = get_in(
        [self.indices.latest, 'mappings', 'properties', 'data', 'properties'],
        mapping.body,
        default={},
    )

    def _extract_fields(field_name, field_props, parents=None):
        parents = [p for p in (parents or []) if p is not None]
        if 'properties' in field_props:
            _extract_fields(field_name, field_props['properties'], parents)
        else:
            for k, v in field_props.items():
                if k.startswith('_') and 'type' in v:
                    field_path = '.'.join(parents + [field_name])
                    parsed_field = parsed_fields.get(
                        field_path, ParsedField(field_path)
                    )
                    parsed_field.add(k, 1)
                    parsed_fields[field_path] = parsed_field
                else:
                    _extract_fields(k, v['properties'], parents + [field_name])

    _extract_fields(None, top_level)

    return list(parsed_fields.values())

get_options(include_uncommitted=False)

Retrieve all the parsing options configured for this database in a dict mapping int versions to ParsingOptions objects. Use the include_uncommitted parameter to indicate whether to include the uncommitted options or not.

Returns:

Type Description
Dict[int, ParsingOptions]

a dict of versions and options

Source code in splitgill/manager.py
414
415
416
417
418
419
420
421
422
423
424
425
426
def get_options(self, include_uncommitted=False) -> Dict[int, ParsingOptions]:
    """
    Retrieve all the parsing options configured for this database in a dict mapping
    int versions to ParsingOptions objects. Use the include_uncommitted parameter to
    indicate whether to include the uncommitted options or not.

    :return: a dict of versions and options
    """
    return {
        doc['version']: ParsingOptions.from_doc(doc['options'])
        for doc in self.options_collection.find({'name': self.name})
        if include_uncommitted or doc['version'] is not None
    }

get_parsed_fields(version=None, query=None, **iter_terms_kwargs)

Retrieves the available parsed fields for this database, optionally at the given version with the given query.

Parameters:

Name Type Description Default
version Optional[int]

the version to find parsed fields at, if None, the latest data is searched

None
query Optional[Query]

the query to filter records with before finding the parsed fields, if None, all record data is considered

None
iter_terms_kwargs

kwargs passed directly to iter_terms (e.g. chunk_size, sample_probability)

{}

Returns:

Type Description
List[ParsedField]

a list of ParsedField objects with the most frequent field first

Source code in splitgill/manager.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
def get_parsed_fields(
    self,
    version: Optional[int] = None,
    query: Optional[Query] = None,
    **iter_terms_kwargs,
) -> List[ParsedField]:
    """
    Retrieves the available parsed fields for this database, optionally at the given
    version with the given query.

    :param version: the version to find parsed fields at, if None, the latest data
        is searched
    :param query: the query to filter records with before finding the parsed fields,
        if None, all record data is considered
    :param iter_terms_kwargs: kwargs passed directly to iter_terms (e.g. chunk_size,
        sample_probability)
    :return: a list of ParsedField objects with the most frequent field first
    """
    search = self.search(version if version is not None else SearchVersion.latest)
    if query is not None:
        search = search.filter(query)

    fields: Dict[str, ParsedField] = {}

    # create the basic field objects and add type counts
    for term in iter_terms(
        search,
        DocumentField.PARSED_TYPES,
        **{
            k: v
            for k, v in iter_terms_kwargs.items()
            if k not in ['search', 'field']
        },
    ):
        path, raw_types = term.value.rsplit('.', 1)
        if path not in fields:
            fields[path] = ParsedField(path)
        fields[path].add(raw_types, term.count)

    # return the parsed fields as a list in a specific order. Note that because
    # we're using multiple orderings in different directions, we do multiple sorts
    # in the reverse order of the order we want them applied.
    # descending depth (so fields closest to the root first)
    parsed_fields = sorted(
        fields.values(), key=lambda f: f.path.count('.'), reverse=True
    )
    # ascending alphabetical order
    parsed_fields.sort(key=lambda f: f.path)
    # descending frequency (so most frequent fields first)
    parsed_fields.sort(key=lambda f: f.count, reverse=True)
    return parsed_fields

get_rounded_version(version)

Given a target version, rounds the version down to the nearest available version. This in effect returns the version of the data that is application to the given target version.

If the target version is below the earliest version or this database's indexed data, or, no indexed versions are available, None is returned.

Parameters:

Name Type Description Default
version int

the target version

required

Returns:

Type Description
Optional[int]

a version or None

Source code in splitgill/manager.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def get_rounded_version(self, version: int) -> Optional[int]:
    """
    Given a target version, rounds the version down to the nearest available
    version. This in effect returns the version of the data that is application to
    the given target version.

    If the target version is below the earliest version or this database's indexed
    data, or, no indexed versions are available, None is returned.

    :param version: the target version
    :return: a version or None
    """
    versions = self.get_versions()
    if not versions or version < versions[0]:
        return None

    return versions[bisect_right(versions, version) - 1]

get_version_changed_counts()

Retrieves the available versions and the number of documents that have changed in the database with that version. Changes can be additions, modifications, or deletions.

Returns:

Type Description
Dict[int, int]

versions and associated change counts

Source code in splitgill/manager.py
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
def get_version_changed_counts(self) -> Dict[int, int]:
    """
    Retrieves the available versions and the number of documents that have changed
    in the database with that version. Changes can be additions, modifications, or
    deletions.

    :return: versions and associated change counts
    """
    search = self.search(version=SearchVersion.all)

    # aggregate over the version field first
    versions = {
        term.value: term.count for term in iter_terms(search, DocumentField.VERSION)
    }

    # then aggregate over the next field to catch deleted records and add those
    versions.update(
        {term.value: term.count for term in iter_terms(search, DocumentField.NEXT)}
    )

    return versions

get_versions()

Returns a list of the available versions that have been indexed into Elasticsearch for this database. The versions are in ascending order and will be retrieved from both the version and next document fields to ensure we capture all versions.

Returns:

Type Description
List[int]

the available versions in ascending order

Source code in splitgill/manager.py
628
629
630
631
632
633
634
635
636
637
def get_versions(self) -> List[int]:
    """
    Returns a list of the available versions that have been indexed into
    Elasticsearch for this database. The versions are in ascending order and will be
    retrieved from both the version and next document fields to ensure we capture
    all versions.

    :return: the available versions in ascending order
    """
    return sorted(self.get_version_changed_counts().keys())

has_data()

Returns True if there is at least one committed record in this database, otherwise returns False. Note that this ignored options.

Returns:

Type Description
bool

True if there is data, False if not

Source code in splitgill/manager.py
232
233
234
235
236
237
238
239
def has_data(self) -> bool:
    """
    Returns True if there is at least one committed record in this database,
    otherwise returns False. Note that this ignored options.

    :return: True if there is data, False if not
    """
    return self.data_collection.find_one({'version': {'$ne': None}}) is not None

has_options()

Returns True if there is at least one committed options in this database, otherwise returns False. Note that this ignored data.

Returns:

Type Description
bool

True if there is options, False if not

Source code in splitgill/manager.py
241
242
243
244
245
246
247
248
def has_options(self) -> bool:
    """
    Returns True if there is at least one committed options in this database,
    otherwise returns False. Note that this ignored data.

    :return: True if there is options, False if not
    """
    return self.options_collection.find_one({'version': {'$ne': None}}) is not None

has_uncommitted_data()

Check if there are any uncommitted records stored against this database.

Returns:

Type Description
bool

returns True if there are any uncommitted records, False if not

Source code in splitgill/manager.py
398
399
400
401
402
403
404
def has_uncommitted_data(self) -> bool:
    """
    Check if there are any uncommitted records stored against this database.

    :return: returns True if there are any uncommitted records, False if not
    """
    return self.data_collection.find_one({'version': None}) is not None

has_uncommitted_options()

Check if there are any uncommitted options stored against this database.

Returns:

Type Description
bool

returns True if there are any uncommitted options, False if not

Source code in splitgill/manager.py
406
407
408
409
410
411
412
def has_uncommitted_options(self) -> bool:
    """
    Check if there are any uncommitted options stored against this database.

    :return: returns True if there are any uncommitted options, False if not
    """
    return self.options_collection.find_one({'version': None}) is not None

ingest(records, commit=True, modified_field=None)

Ingests the given records to the database. This only adds the records to the MongoDB data collection, it doesn't trigger the indexing of this new data into the Elasticsearch cluster. All data will be added with a None version unless the commit parameter is True in which case a version will be assigned.

Use the commit keyword argument to either close the "transaction" after writing these records or leave it open. By default, the "transaction" is committed before the method returns, and the version is set then.

If an error occurs, the "transaction" will not be committed, but the changes will not be rolled back.

Parameters:

Name Type Description Default
records Iterable[Record]

the records to add. These will be added in batches, so it is safe to pass a very large stream of records

required
commit

whether to commit the data added with a new version after writing the records. Default: True.

True
modified_field Optional[str]

a field name which, if the only changes in the record data are in this field means the changes will be ignored. As you can probably guess from the name, the root reason for this parameter existing is to avoid committing a new version of a record when all that has happened is the record has been touched and the modified field's date value updated even though the rest of the record remains the same. Default: None, meaning all fields are checked for changes.

None

Returns:

Type Description
IngestResult

returns a IngestResult object

Source code in splitgill/manager.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def ingest(
    self,
    records: Iterable[Record],
    commit=True,
    modified_field: Optional[str] = None,
) -> IngestResult:
    """
    Ingests the given records to the database. This only adds the records to the
    MongoDB data collection, it doesn't trigger the indexing of this new data into
    the Elasticsearch cluster. All data will be added with a None version unless the
    commit parameter is True in which case a version will be assigned.

    Use the commit keyword argument to either close the "transaction" after writing
    these records or leave it open. By default, the "transaction" is committed
    before the method returns, and the version is set then.

    If an error occurs, the "transaction" will not be committed, but the changes
    will not be rolled back.

    :param records: the records to add. These will be added in batches, so it is
        safe to pass a very large stream of records
    :param commit: whether to commit the data added with a new version after writing
        the records. Default: True.
    :param modified_field: a field name which, if the only changes in the record
        data are in this field means the changes will be ignored. As you can
        probably guess from the name, the root reason for this parameter existing is
        to avoid committing a new version of a record when all that has happened is
        the record has been touched and the modified field's date value updated even
        though the rest of the record remains the same. Default: None, meaning all
        fields are checked for changes.
    :return: returns a IngestResult object
    """
    # this does nothing if the indexes already exist
    self.data_collection.create_indexes(
        [IndexModel([('id', ASCENDING)]), IndexModel([('version', DESCENDING)])]
    )

    result = IngestResult()
    # this is used for the find size and the bulk ops partition size which both need
    # to be the same to ensure we can handle duplicates in the record stream
    size = 200

    for ops in partition(
        generate_ops(self.data_collection, records, modified_field, size), size
    ):
        bulk_result = self.data_collection.bulk_write(ops)
        result.update(bulk_result)

    if commit:
        result.version = self.commit()

    return result

iter_records(**find_kwargs)

Yields MongoRecord objects matching the given find kwargs. As you can probably guess, the find_kwargs argument is just passed directly to PyMongo's find method.

Parameters:

Name Type Description Default
find_kwargs

args to pass to the data collection's find method

{}

Returns:

Type Description
Iterable[MongoRecord]

yields matching MongoRecord objects

Source code in splitgill/manager.py
428
429
430
431
432
433
434
435
436
437
438
439
def iter_records(self, **find_kwargs) -> Iterable[MongoRecord]:
    """
    Yields MongoRecord objects matching the given find kwargs. As you can probably
    guess, the find_kwargs argument is just passed directly to PyMongo's find
    method.

    :param find_kwargs: args to pass to the data collection's find method
    :return: yields matching MongoRecord objects
    """
    yield from (
        MongoRecord(**doc) for doc in self.data_collection.find(**find_kwargs)
    )

resync_arcs(bulk_options=None)

Resynchronises all the arc indices. The arc indices are deleted first and then resynced. The latest index isn't touched.

Parameters:

Name Type Description Default
bulk_options Optional[BulkOptions]

options determining how the bulk operations are sent to Elasticsearch

None

Returns:

Type Description
WriteResult

a WriteResult object

Source code in splitgill/manager.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
def resync_arcs(self, bulk_options: Optional[BulkOptions] = None) -> WriteResult:
    """
    Resynchronises all the arc indices. The arc indices are deleted first and then
    resynced. The latest index isn't touched.

    :param bulk_options: options determining how the bulk operations are sent to
        Elasticsearch
    :return: a WriteResult object
    """
    # delete all arcs
    res = self._client.elasticsearch.indices.get_alias(
        index=self.indices.arc_wildcard
    )
    for arc_index in res.keys():
        self._client.elasticsearch.indices.delete(index=arc_index)

    create_templates(self._client.elasticsearch)

    # cache this so that we don't have to look it up for every op
    latest_index_name = self.indices.latest

    return write_ops(
        self._client.elasticsearch,
        # filter out ops on the latest index so that only the arc indices are
        # written to. This will filter out both index and delete ops
        (
            op
            for op in generate_index_ops(
                self.indices,
                # this will always return the default ArcStatus cause we just
                # deleted them all
                self.get_arc_status(),
                # get all committed records
                self.iter_records(filter={'version': {'$ne': None}}),
                # get all committed options
                self.get_options(include_uncommitted=False),
                None,
            )
            if op.index != latest_index_name
        ),
        bulk_options,
    )

rollback_options()

Remove any uncommitted option changes.

There should only ever be one, but this deletes them all ensuring everything is clean and tidy.

Returns:

Type Description
int

the number of documents deleted

Source code in splitgill/manager.py
375
376
377
378
379
380
381
382
383
384
def rollback_options(self) -> int:
    """
    Remove any uncommitted option changes.

    There should only ever be one, but this deletes them all ensuring everything is
    clean and tidy.

    :return: the number of documents deleted
    """
    return self.options_collection.delete_many({'version': None}).deleted_count

rollback_records()

Remove any uncommitted data changes.

This method has to interrogate every uncommitted record in the data collection to perform the rollback and therefore, depending on how much uncommitted data there is, may take a bit of time to run.

Source code in splitgill/manager.py
386
387
388
389
390
391
392
393
394
395
396
def rollback_records(self):
    """
    Remove any uncommitted data changes.

    This method has to interrogate every uncommitted record in the data collection
    to perform the rollback and therefore, depending on how much uncommitted data
    there is, may take a bit of time to run.
    """
    if self.has_uncommitted_data():
        for ops in partition(generate_rollback_ops(self.data_collection), 200):
            self.data_collection.bulk_write(ops)

search(version=SearchVersion.latest)

Creates a Search DSL object to use on this database's indexed data. This Search object will be setup with the appropriate index and version filter depending on the given version parameter, and the Elasticsearch client object in use on this database.

If a version number is passed as the version parameter, it will be checked against the latest version available in Elasticsearch. If it is below the latest version available in Elasticsearch, all indices will be searched and a term filter will be used to get the right data. If the version is equal to or above the latest version available in Elasticsearch, the latest index will be searched and no version term filter will be used. This is for Elasticsearch performance and caching.

If version is not an int but is instead a SearchVersion, this is used to set the index on the search object. SearchVersion.latest sets the index to this database's latest index, whereas SearchVersion.all sets the index to a wildcard to search on all indices used by this database.

Parameters:

Name Type Description Default
version Union[SearchVersion, int]

the version to search at, this should either be a SearchVersion enum option or an int version.

latest

Returns:

Type Description
Search

a Search DSL object

Source code in splitgill/manager.py
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
def search(
    self, version: Union[SearchVersion, int] = SearchVersion.latest
) -> Search:
    """
    Creates a Search DSL object to use on this database's indexed data. This Search
    object will be setup with the appropriate index and version filter depending on
    the given version parameter, and the Elasticsearch client object in use on this
    database.

    If a version number is passed as the version parameter, it will be checked
    against the latest version available in Elasticsearch. If it is below the latest
    version available in Elasticsearch, all indices will be searched and a term
    filter will be used to get the right data. If the version is equal to or above
    the latest version available in Elasticsearch, the latest index will be searched
    and no version term filter will be used. This is for Elasticsearch performance
    and caching.

    If version is not an int but is instead a SearchVersion, this is used to set the
    index on the search object. SearchVersion.latest sets the index to this
    database's latest index, whereas SearchVersion.all sets the index to a wildcard
    to search on all indices used by this database.

    :param version: the version to search at, this should either be a SearchVersion
        enum option or an int version.
    :return: a Search DSL object
    """
    search = Search(using=self._client.elasticsearch)

    if isinstance(version, int):
        current_version = self.get_elasticsearch_version()
        if current_version is not None and current_version <= version:
            # the version requested is above the latest version, use the latest
            # index instead of a filter, it'll be faster and more easily cachable
            # for elasticsearch
            search = search.index(self.indices.latest)
        else:
            search = search.index(self.indices.wildcard)
            search = search.filter(version_query(version))
    else:
        if version == SearchVersion.latest:
            search = search.index(self.indices.latest)
        elif version == SearchVersion.all:
            search = search.index(self.indices.wildcard)

    return search

sync(bulk_options=None, resync=False)

Synchronise the data/options in MongoDB with the data in Elasticsearch by updating the latest and old data indices as required.

To find the data that needs to be updated, the current version of the data in MongoDB is compared to the current version of the data in Elasticsearch, and the two are synced (assuming MongoDB's version is <= Elasticsearch).

While the data is being indexed, refreshing is paused on this database's indexes and only resumed once all the data has been indexed. If an error occurs during indexing then the refresh interval is not reset meaning the updates that have made it to Elasticsearch will not impact searches until a refresh is triggered, or the refresh interval is reset. This kinda makes this function transactional. If no errors occur, the refresh interval is reset (along with the replica count) and a refresh is called. This means that if this function returns successfully, the data updated by it will be immediately available for searches.

Note that if resync=True, the arc indices will be deleted before reindexing begins! The latest index is not deleted as the documents in the latest index have IDs so can be directly replaced.

Parameters:

Name Type Description Default
bulk_options Optional[BulkOptions]

options determining how the bulk operations are sent to Elasticsearch

None
resync bool

whether to resync all records with Elasticsearch regardless of the currently synced version. This won't delete any data in the latest index as those records are replaced during resync, but any arc indices will be deleted prior to re-indexing.

False

Returns:

Type Description
WriteResult

a WriteResult object

Source code in splitgill/manager.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
def sync(
    self, bulk_options: Optional[BulkOptions] = None, resync: bool = False
) -> WriteResult:
    """
    Synchronise the data/options in MongoDB with the data in Elasticsearch by
    updating the latest and old data indices as required.

    To find the data that needs to be updated, the current version of the data in
    MongoDB is compared to the current version of the data in Elasticsearch, and the
    two are synced (assuming MongoDB's version is <= Elasticsearch).

    While the data is being indexed, refreshing is paused on this database's indexes
    and only resumed once all the data has been indexed. If an error occurs during
    indexing then the refresh interval is not reset meaning the updates that have
    made it to Elasticsearch will not impact searches until a refresh is triggered,
    or the refresh interval is reset. This kinda makes this function transactional.
    If no errors occur, the refresh interval is reset (along with the replica count)
    and a refresh is called. This means that if this function returns successfully,
    the data updated by it will be immediately available for searches.

    Note that if resync=True, the arc indices will be deleted before reindexing
    begins! The latest index is not deleted as the documents in the latest index
    have IDs so can be directly replaced.

    :param bulk_options: options determining how the bulk operations are sent to
        Elasticsearch
    :param resync: whether to resync all records with Elasticsearch regardless of
        the currently synced version. This won't delete any data in the latest index
        as those records are replaced during resync, but any arc indices will be
        deleted prior to re-indexing.
    :return: a WriteResult object
    """
    if not self.has_data():
        return WriteResult()

    all_options = self.get_options(include_uncommitted=False)
    last_sync = self.get_elasticsearch_version() if not resync else None
    if last_sync is None:
        # elasticsearch has nothing so find all committed records
        find_filter = {'version': {'$ne': None}}
    else:
        committed_version = self.get_committed_version()
        if last_sync >= committed_version:
            # elasticsearch and mongo are in sync (use >= rather than == just in
            # case some bizarro-ness has occurred)
            return WriteResult()
        if any(version > last_sync for version in all_options):
            # there's an options change ahead, this means we need to check all
            # records again, so filter out committed records only
            find_filter = {'version': {'$ne': None}}
        else:
            # find all the updated records that haven't had their updates synced yet
            find_filter = {'version': {'$gt': last_sync}}

    if resync:
        # delete all arcs
        res = self._client.elasticsearch.indices.get_alias(
            index=self.indices.arc_wildcard
        )
        for arc_index in res.keys():
            self._client.elasticsearch.indices.delete(index=arc_index)

    create_templates(self._client.elasticsearch)

    return write_ops(
        self._client.elasticsearch,
        generate_index_ops(
            self.indices,
            self.get_arc_status(),
            self.iter_records(filter=find_filter),
            all_options,
            last_sync,
        ),
        bulk_options,
    )

update_options(options, commit=True)

Update the parsing options for this database.

Parameters:

Name Type Description Default
options ParsingOptions

the new parsing options

required
commit

whether to commit the new config added with a new version after writing the config. Default: True.

True

Returns:

Type Description
Optional[int]

returns the new version if a commit happened, otherwise None. If a commit was requested but nothing was changed, None is returned.

Source code in splitgill/manager.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def update_options(self, options: ParsingOptions, commit=True) -> Optional[int]:
    """
    Update the parsing options for this database.

    :param options: the new parsing options
    :param commit: whether to commit the new config added with a new version after
        writing the config. Default: True.
    :return: returns the new version if a commit happened, otherwise None. If a
        commit was requested but nothing was changed, None is returned.
    """
    # get the latest options that have been committed (get_options ignores
    # uncommitted options)
    all_options = self.get_options()
    if all_options:
        latest_options = all_options[max(all_options)]
    else:
        latest_options = None

    if self.has_uncommitted_options():
        self.rollback_options()

    # if the options are the same as the latest existing ones, don't update
    if latest_options == options:
        return None

    # either the options are completely new or they differ from the existing
    # options, write a fresh entry
    new_doc = {
        'name': self.name,
        # version = None to indicate this is an uncommitted change
        'version': None,
        'options': options.to_doc(),
    }
    self.options_collection.insert_one(new_doc)

    if commit:
        return self.commit()
    return None