Skip to content

Commit cee0e45

Browse files
committed
Output/logging updates, simplification of multiproc command line arguments.
1 parent 86f8ac7 commit cee0e45

File tree

8 files changed

+48
-41
lines changed

8 files changed

+48
-41
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
#### 2018-09-17 version 0.1.2
2+
3+
* Simplified command arguments for multiprocessing
4+
* Moved from print statements to pure Python logging-based output
5+
*
6+
17
#### 2018-09-15 version 0.1
28

39
* Initial release

esdocs/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import logging
2727

2828
__appname__ = __package__
29-
__version__ = "0.1.1"
29+
__version__ = "0.1.2"
3030

3131
app_version = "{}/{}".format(__appname__, __version__)
3232

esdocs/contrib/esdjango/run.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def run():
3434
# in esdocs.contrib.esdjango.apps
3535
django.setup()
3636
except ImportError:
37-
print("esdocs-django must be run from the root of your Django project (where manage.py lives).")
37+
logger.info("esdocs-django must be run from the root of your Django project (where manage.py lives).")
3838
return
3939

4040
base_run(DjangoController)

esdocs/contrib/esdjango/serializer.py

-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ def fetch_data(cls, **kwargs):
5050
if end and queryset_end > end:
5151
queryset_end = end
5252

53-
print('.', end='', flush=True)
5453
n = 0
5554
for n, row in enumerate(queryset[queryset_start: queryset_end]):
5655
yield row

esdocs/controller.py

+20-22
Original file line numberDiff line numberDiff line change
@@ -86,60 +86,58 @@ def run_operation(self, **options):
8686
pass
8787

8888
def index_list(self, **options):
89-
print("Known, managed indexes{}:".format(" (limited results due to --indexes option)" if self.index_names else ""))
89+
logger.info("Known, managed indexes{}:".format(
90+
" (limited results due to --indexes option)" if self.index_names else ""))
9091
for name, index in self.indexes.items():
91-
print(" - {}".format(name))
92+
logger.info(" - {}".format(name))
9293

9394
def index_init(self, **options):
9495
for name, index in self.new_indexes.items():
9596
if not self.client.indices.exists(name):
9697
self._index_create(index, name, True)
9798
else:
98-
print("Index '{}' already exists. No change made.".format(index._name))
99+
logger.info("Index '{}' already exists. No change made.".format(index._name))
99100

100101
def index_update(self, **options):
101102
for name, index in self.indexes.items():
102103
if not self.client.indices.exists(name):
103104
self._index_create(self.new_indexes[name], name, True)
104105
else:
105-
# index.close(using=self.using)
106106
try:
107107
index.save(using=self.using)
108-
print("Updated index mapping for '{}'.".format(name))
108+
logger.info("Updated index mapping for '{}'.".format(name))
109109
except elasticsearch.exceptions.RequestError as e:
110-
print(str(e))
111-
# index.open(using=self.using)
110+
logger.info(str(e))
112111

113112
def index_rebuild(self, **options):
114-
multiproc = options.get('multiproc', False)
115-
if multiproc:
113+
if options.get('multi') is not None:
116114
policy = ParallelStreamingPolicy(self.parallel_prep)
117115
else:
118116
policy = StreamingPolicy()
119117

120118
for name, index in self.new_indexes.items():
121119
self._index_create(index, name, set_alias=False)
122120

123-
print("Updating index settings to be bulk-indexing friendly...")
121+
logger.info("Updating index settings to be bulk-indexing friendly...")
124122
original_settings = index.get_settings(using=self.using).get(index._name, {}).get('settings', {})
125123
index.put_settings(body={
126124
"index.number_of_replicas": 0,
127125
"index.refresh_interval": '-1'
128126
})
129127

130-
print("Indexing data for '{}'...".format(index._name))
128+
logger.info("Indexing data for '{}'...".format(index._name))
131129

132130
for serializer in self._serializers[name]:
133-
print(" - processing '{}' documents: ".format(serializer.document.__name__), end='', flush=True)
131+
logger.info(" - processing '{}' documents".format(serializer.document.__name__))
134132
policy.bulk_operation(serializer, index=index._name, client=self.client, **options)
135-
print()
133+
logger.info()
136134

137-
print("Data indexed data for '{}'.".format(index._name))
135+
logger.info("Data indexed data for '{}'.".format(index._name))
138136

139-
print("Force merging index data...")
137+
logger.info("Force merging index data...")
140138
index.forcemerge()
141139

142-
print("Restoring original/default index settings...")
140+
logger.info("Restoring original/default index settings...")
143141
index.put_settings(body={
144142
"index.number_of_replicas": original_settings.get('index', {}).get('number_of_replicas', 1),
145143
"index.refresh_interval": original_settings.get('index', {}).get('refresh_interval', '1s')
@@ -153,7 +151,7 @@ def index_rebuild(self, **options):
153151
{'add': {'index': index._name, 'alias': name}}
154152
]
155153
})
156-
print("Created alias '{}' for '{}'.".format(name, index._name))
154+
logger.info("Created alias '{}' for '{}'.".format(name, index._name))
157155

158156
policy.close()
159157

@@ -166,10 +164,10 @@ def index_cleanup(self, **options):
166164
def _index_create(self, index, alias, set_alias=False):
167165
if not set_alias:
168166
index.create(using=self.using)
169-
print("Created index '{}', no alias set.".format(index._name))
167+
logger.info("Created index '{}', no alias set.".format(index._name))
170168
else:
171169
index.aliases(**{alias: {}}).create(using=self.using)
172-
print("Created index '{}', aliased to '{}'.".format(index._name, alias))
170+
logger.info("Created index '{}', aliased to '{}'.".format(index._name, alias))
173171

174172
def _indexes_delete(self, **options):
175173
old_indexes = []
@@ -182,9 +180,9 @@ def _indexes_delete(self, **options):
182180
no_input = options.get('no_input', False)
183181
if no_input:
184182
self.client.indices.delete(",".join(old_indexes))
185-
print("Deleting old unaliased indexes:")
183+
logger.info("Deleting old unaliased indexes:")
186184
for _old in old_indexes:
187-
print(" - deleted index '{}'".format(_old))
185+
logger.info(" - deleted index '{}'".format(_old))
188186
else:
189187
for _old in old_indexes:
190188
user_input = 'y' if no_input else ''
@@ -194,4 +192,4 @@ def _indexes_delete(self, **options):
194192
break
195193
if user_input == 'y':
196194
self.client.indices.delete(_old)
197-
print(" - deleted index '{}'".format(_old))
195+
logger.info(" - deleted index '{}'".format(_old))

esdocs/policies.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def __init__(self, parallel_prep):
7373
self.parallel_prep = parallel_prep
7474

7575
def bulk_operation(self, serializer, index, client, **options):
76-
procs = options.get('processes')
76+
procs = options.get('multi')
7777
if not procs:
7878
procs = os.cpu_count()
7979
procs = procs if procs > 1 else 2

esdocs/serializer.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -243,5 +243,4 @@ def bulk_operation(cls, index=None, client=None, **options):
243243
if not ok:
244244
action, result = result.popitem()
245245
doc_id = '/%s/doc/%s' % (index, result['_id'])
246-
print('Failed to %s document %s:' % (action, doc_id,), result)
247-
# break
246+
logger.warning('Failed to {} document {}: {}'.format(action, doc_id, result))

esdocs/utils.py

+18-13
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,9 @@ def add_parser_arguments(parser):
3232
help="Comma-separate list of index names to target")
3333
parent.add_argument('--using', action='store', default=None,
3434
help="Elasticsearch named connection to use")
35-
36-
parent.add_argument('--multiproc', action='store_true', default=False,
37-
help="Enable multiple process indexing")
38-
parent.add_argument('--numprocs', action='store', type=int, default=None,
39-
help="How many processes to use for indexing; "
40-
"default is CPU core count")
35+
parent.add_argument('--multi', nargs='?', const=0, type=int,
36+
help="Enable multiple processes and optionally set number of "
37+
"CPU cores to use (defaults to all cores)")
4138

4239
# hack to get comment args into the main parser;
4340
# stolen from python argparse source code
@@ -50,15 +47,17 @@ def add_parser_arguments(parser):
5047
parser._defaults.update(defaults)
5148

5249
sps = parser.add_subparsers(title="commands", dest='action')
53-
# this is a hack for a Django 1.11 bug: https://code.djangoproject.com/ticket/29295;
50+
# this is a hack for a Django 1.11 bug:
51+
# https://code.djangoproject.com/ticket/29295;
5452
# TODO: remove when we drop support for broken Django versions
5553
sps._parser_class = argparse.ArgumentParser
5654

5755
p = sps.add_parser('list', help="List indexes", parents=[parent])
5856
p = sps.add_parser('init', help="Initialize indexes", parents=[parent])
5957
p = sps.add_parser('update', help="Update indexes", parents=[parent])
6058
p = sps.add_parser('rebuild', help="Rebuild indexes", parents=[parent])
61-
p.add_argument('--cleanup', action="store_true", dest='delete_old_indexes', default=False)
59+
p.add_argument('--cleanup', action="store_true", dest='delete_old_indexes',
60+
default=False)
6261
p = sps.add_parser('cleanup', help="Delete unaliased indexes", parents=[parent])
6362

6463

@@ -69,7 +68,8 @@ def run(controller_klass=None):
6968
from . import app_version, logger as parent_logger
7069

7170
parser = argparse.ArgumentParser()
72-
parser.add_argument("-v", "--verbose", action="store_true", help="increase output verbosity")
71+
parser.add_argument("-v", "--verbose", action="store_true",
72+
help="increase output verbosity")
7373
parser.add_argument("--version", action="version", version=app_version)
7474
add_parser_arguments(parser)
7575
args = parser.parse_args()
@@ -86,12 +86,17 @@ def run(controller_klass=None):
8686

8787
parent_logger.addHandler(logh)
8888

89-
logger.info('Logging enabled at {} verbosity'.format(logging.getLevelName(logger.getEffectiveLevel())))
90-
logger.info('Multi-process indexing: {}'.format('available' if gevent_enabled else 'unavailable (set environment variable ESDOCS_GEVENT=1)'))
89+
logger.info('Logging enabled at {} verbosity'.format(
90+
logging.getLevelName(logger.getEffectiveLevel())))
9191

92-
if not gevent_enabled and args.multiproc:
93-
logger.error('Multi-process indexing not available unless environment variable ESDOCS_GEVENT=1 is set, stopping...')
92+
if not gevent_enabled and args.multi is not None:
93+
logger.error('Multi-process indexing not available unless environment '
94+
'variable ESDOCS_GEVENT=1 is set, stopping...')
9495
return
96+
else:
97+
logger.info('Multi-process indexing: {}'.format(
98+
'available' if gevent_enabled else 'unavailable (set environment '
99+
'variable ESDOCS_GEVENT=1)'))
95100

96101
register_serializers(os.getenv('ESDOCS_SERIALIZER_MODULES'))
97102
Serializer.register_hooks(os.getenv('ESDOCS_SERIALIZER_COMPATIBILITY_HOOKS'))

0 commit comments

Comments
 (0)