Skip to content

Syncing

BulkOpException

Bases: Exception

Exception raised when errors are detected in the response of a bulk operation.

Source code in splitgill/indexing/syncing.py
34
35
36
37
38
39
40
41
class BulkOpException(Exception):
    """
    Exception raised when errors are detected in the response of a bulk operation.
    """

    def __init__(self, errors: List[dict]):
        super().__init__(f'{len(errors)} errors during bulk index. Sample: {errors[0]}')
        self.errors = errors

BulkOptions dataclass

Options for writing bulk operations to Elasticsearch.

Source code in splitgill/indexing/syncing.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclass
class BulkOptions:
    """
    Options for writing bulk operations to Elasticsearch.
    """

    # the number of ops to send to Elasticsearch in one bulk request
    chunk_size: int = 100
    # the number of concurrent workers to use to send the requests
    worker_count: int = 2
    # chunks of ops are buffered in a queue which has a maximum size set at
    # worker_count * buffer_multiplier, hence, this parameter can be used to control
    # this buffer size. The size of this buffer impacts memory usage and also reduces
    # the time workers spend doing nothing
    buffer_multiplier: int = 3

    @property
    def op_buffer_size(self) -> int:
        return self.worker_count * self.buffer_multiplier

WriteResult

Represents the outcome of writing a stream of bulk index operations to Elasticsearch.

Source code in splitgill/indexing/syncing.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class WriteResult:
    """
    Represents the outcome of writing a stream of bulk index operations to
    Elasticsearch.
    """

    def __init__(
        self,
        worker_counts: Optional[Iterable[Tuple[int, int]]] = None,
        indices: Optional[Set[str]] = None,
    ):
        """
        :param worker_counts: optional iterable of tuples containing the insert and
                              delete counts from each worker
        :param indices: optional set of indices operated by the workers during the load
        """
        self.indices = [] if indices is None else sorted(indices)
        if not worker_counts:
            self.indexed = 0
            self.deleted = 0
        else:
            self.indexed, self.deleted = map(sum, zip(*worker_counts))

    @property
    def total(self) -> int:
        return self.indexed + self.deleted

__init__(worker_counts=None, indices=None)

Parameters:

Name Type Description Default
worker_counts Optional[Iterable[Tuple[int, int]]]

optional iterable of tuples containing the insert and delete counts from each worker

None
indices Optional[Set[str]]

optional set of indices operated by the workers during the load

None
Source code in splitgill/indexing/syncing.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(
    self,
    worker_counts: Optional[Iterable[Tuple[int, int]]] = None,
    indices: Optional[Set[str]] = None,
):
    """
    :param worker_counts: optional iterable of tuples containing the insert and
                          delete counts from each worker
    :param indices: optional set of indices operated by the workers during the load
    """
    self.indices = [] if indices is None else sorted(indices)
    if not worker_counts:
        self.indexed = 0
        self.deleted = 0
    else:
        self.indexed, self.deleted = map(sum, zip(*worker_counts))

check_for_errors(tasks)

Loop through the given worker tasks and check if any of them have finished. If they have finished, either update the result or raise the exception that stopped the task.

Parameters:

Name Type Description Default
tasks Set[Task]

the worker tasks

required
Source code in splitgill/indexing/syncing.py
195
196
197
198
199
200
201
202
203
204
205
206
207
def check_for_errors(tasks: Set[Task]):
    """
    Loop through the given worker tasks and check if any of them have finished. If they
    have finished, either update the result or raise the exception that stopped the
    task.

    :param tasks: the worker tasks
    """
    completed_tasks = [task for task in tasks if task.done()]
    for task in completed_tasks:
        error = task.exception()
        if error is not None:
            raise error

refresh(client, indices, attempts=3)

Given an Elasticsearch client and a list of indices, call refresh on the indices. If the request gets a connection timeout from Elasticsearch, the operation will be retried up to attempts times with an increasing timeout after each failure.

Parameters:

Name Type Description Default
client Elasticsearch

the Elasticsearch client to use

required
indices List[str]

the indices to refresh

required
attempts int

the number of times to attempt before raising an exception

3
Source code in splitgill/indexing/syncing.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def refresh(client: Elasticsearch, indices: List[str], attempts: int = 3):
    """
    Given an Elasticsearch client and a list of indices, call refresh on the indices. If
    the request gets a connection timeout from Elasticsearch, the operation will be
    retried up to `attempts` times with an increasing timeout after each failure.

    :param client: the Elasticsearch client to use
    :param indices: the indices to refresh
    :param attempts: the number of times to attempt before raising an exception
    """
    sleep_duration = 1
    while True:
        try:
            client.indices.refresh(index=indices)
            break
        except ConnectionTimeout as e:
            attempts -= 1
            if attempts == 0:
                raise e
            else:
                time.sleep(sleep_duration)
                sleep_duration *= 1.5

setup_indices(client, indices) async

Sets up the given Elasticsearch indices with settings for optimal bulk indexing. Any index that doesn't exist, will be created.

Parameters:

Name Type Description Default
client AsyncElasticsearch

an AsyncElasticsearch object

required
indices Iterable[str]

the indices to set up

required
Source code in splitgill/indexing/syncing.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
async def setup_indices(client: AsyncElasticsearch, indices: Iterable[str]):
    """
    Sets up the given Elasticsearch indices with settings for optimal bulk indexing. Any
    index that doesn't exist, will be created.

    :param client: an AsyncElasticsearch object
    :param indices: the indices to set up
    """
    for index in indices:
        if not await client.indices.exists(index=index):
            await client.indices.create(index=index)
        await client.indices.put_settings(
            body={'index': {'refresh_interval': -1, 'number_of_replicas': 0}},
            index=index,
        )

worker(client, task_queue) async

Async worker function for sending chunks of bulk index operations to Elasticsearch.

Parameters:

Name Type Description Default
client AsyncElasticsearch

the AsyncElasticsearch client to use for requests

required
task_queue Queue

the queue to retrieve tasks from

required
Source code in splitgill/indexing/syncing.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
async def worker(client: AsyncElasticsearch, task_queue: Queue) -> Tuple[int, int]:
    """
    Async worker function for sending chunks of bulk index operations to Elasticsearch.

    :param client: the AsyncElasticsearch client to use for requests
    :param task_queue: the queue to retrieve tasks from
    """
    indexed = 0
    deleted = 0

    while True:
        chunk: List[BulkOp] = await task_queue.get()
        if not chunk:
            task_queue.task_done()
            break

        retries = 3
        attempts = 0
        while True:
            try:
                response = await client.bulk(
                    operations=[op.serialise() for op in chunk], refresh=False
                )
                break
            except ConnectionTimeout:
                if attempts < retries:
                    # try again after a backoff
                    await sleep(2**attempts)
                    attempts += 1
                else:
                    raise

        errors = []
        for item in response['items']:
            action = next(iter(item.keys()))
            error = item[action].get('error', None)
            if error:
                errors.append(item)
            else:
                if action == 'index':
                    indexed += 1
                elif action == 'delete':
                    deleted += 1

        if errors:
            raise BulkOpException(errors)

        task_queue.task_done()

    return indexed, deleted

write_ops(client, op_stream, options)

Write the given iterable of bulk index operations to Elasticsearch.

Parameters:

Name Type Description Default
client Elasticsearch

an Elasticsearch client, this isn't actually used as the processing is done asynchronously using the AsyncElasticsearch class, but we pull the hosts from this Elasticsearch client to create the AsyncElasticsearch object

required
op_stream Iterable[BulkOp]

an iterable of BulkOp objects

required
options Optional[BulkOptions]

options determining how we do the bulk write

required

Returns:

Type Description
WriteResult

a WriteResult object

Source code in splitgill/indexing/syncing.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def write_ops(
    client: Elasticsearch, op_stream: Iterable[BulkOp], options: Optional[BulkOptions]
) -> WriteResult:
    """
    Write the given iterable of bulk index operations to Elasticsearch.

    :param client: an Elasticsearch client, this isn't actually used as the processing
        is done asynchronously using the AsyncElasticsearch class, but we pull the hosts
        from this Elasticsearch client to create the AsyncElasticsearch object
    :param op_stream: an iterable of BulkOp objects
    :param options: options determining how we do the bulk write
    :return: a WriteResult object
    """
    node_configs = [node.config for node in client.transport.node_pool.all()]
    if options is None:
        options = BulkOptions()

    result = run(write_ops_async(node_configs, op_stream, options))

    # refresh all indices to make the changes visible all at once
    refresh(client, result.indices)

    # return the indices back to their original settings as defined in the template
    client.indices.put_settings(
        body={'index': {'refresh_interval': None, 'number_of_replicas': None}},
        index=result.indices,
    )

    return result

write_ops_async(hosts, op_stream, options) async

Writes the given iterable of bulk index operations to Elasticsearch using a set of async workers for maximum efficiency.

Parameters:

Name Type Description Default
hosts List[NodeConfig]

a list of elasticsearch node configurations

required
op_stream Iterable[BulkOp]

an iterable of BulkOp objects

required
options BulkOptions

options determining how we do the bulk write

required

Returns:

Type Description
WriteResult

a WriteResult object

Source code in splitgill/indexing/syncing.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
async def write_ops_async(
    hosts: List[NodeConfig], op_stream: Iterable[BulkOp], options: BulkOptions
) -> WriteResult:
    """
    Writes the given iterable of bulk index operations to Elasticsearch using a set of
    async workers for maximum efficiency.

    :param hosts: a list of elasticsearch node configurations
    :param op_stream: an iterable of BulkOp objects
    :param options: options determining how we do the bulk write
    :return: a WriteResult object
    """
    client = AsyncElasticsearch(hosts)
    try:
        task_queue = Queue(maxsize=options.op_buffer_size)

        # set up the workers
        workers = {
            create_task(worker(client, task_queue)) for _ in range(options.worker_count)
        }

        indices_seen = set()

        for chunk in partition(op_stream, options.chunk_size):
            # track the indices we come across so that we can set up the optimal index
            # settings for ingestion
            new_indices = {op.index for op in chunk if op.index not in indices_seen}
            if new_indices:
                indices_seen.update(new_indices)
                await setup_indices(client, new_indices)
            # relinquish control so that the workers can do some work
            await sleep(0)
            # put the next task on the queue
            await task_queue.put(chunk)
            # check to see if any workers have raised exceptions
            check_for_errors(workers)

        # tell the workers we're done and wait for them to finish up
        for _ in workers:
            await task_queue.put(None)
        await task_queue.join()

        check_for_errors(workers)

        # collect the worker results up and return
        worker_counts = await gather(*workers)
        return WriteResult(worker_counts, indices_seen)
    finally:
        await client.close()