Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uploader: add options --dry_run and --one_shot #3707

Merged
merged 13 commits into from
Jun 5, 2020
24 changes: 24 additions & 0 deletions tensorboard/uploader/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ py_library(
visibility = ["//tensorboard:internal"],
deps = [
":auth",
":dry_run_stubs",
":exporter",
":flags_parser",
":formatters",
Expand Down Expand Up @@ -121,9 +122,12 @@ py_test(
srcs = ["uploader_test.py"],
srcs_version = "PY3",
deps = [
":dry_run_stubs",
":server_info",
":test_util",
":upload_tracker",
":uploader",
":uploader_subcommand",
":util",
"//tensorboard:data_compat",
"//tensorboard:dataclass_compat",
Expand Down Expand Up @@ -156,6 +160,26 @@ py_test(
],
)

py_library(
name = "dry_run_stubs",
srcs = ["dry_run_stubs.py"],
deps = [
"//tensorboard/uploader/proto:protos_all_py_pb2",
"//tensorboard/uploader/proto:protos_all_py_pb2_grpc",
],
)

py_test(
name = "dry_run_stubs_test",
srcs = ["dry_run_stubs_test.py"],
srcs_version = "PY3",
deps = [
":dry_run_stubs",
"//tensorboard:test",
"//tensorboard/uploader/proto:protos_all_py_pb2",
],
)

py_library(
name = "auth",
srcs = ["auth.py"],
Expand Down
57 changes: 57 additions & 0 deletions tensorboard/uploader/dry_run_stubs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Dry-run stubs for various rpc services."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorboard.uploader.proto import write_service_pb2
from tensorboard.uploader.proto import write_service_pb2_grpc


class DryRunTensorBoardWriterStub(object):
"""A dry-run TensorBoardWriter gRPC Server.

Only the methods used by the `tensorboard dev upload` are
mocked out in this class.

When additional methods start to be used by the command,
their mocks should be added to this class.
"""

def CreateExperiment(self, request, **kwargs):
"""Create a new experiment and remember it has been created."""
del request, kwargs # Unused.
return write_service_pb2.CreateExperimentResponse()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little surprised that all the responses can be empty here (except the BlobSequence case, I see). OK, assuming you've tested a dry run with scalar, tensor, and blob data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Manually tested such logdirs with --dry_run.


def WriteScalar(self, request, **kwargs):
del request, kwargs # Unused.
return write_service_pb2.WriteScalarResponse()

def WriteTensor(self, request, **kwargs):
del request, kwargs # Unused.
return write_service_pb2.WriteTensorResponse()

def GetOrCreateBlobSequence(self, request, **kwargs):
del request, kwargs # Unused.
return write_service_pb2.GetOrCreateBlobSequenceResponse(
blob_sequence_id="dummy_blob_sequence_id"
)

def WriteBlob(self, request, **kwargs):
del kwargs # Unused.
for item in request:
yield write_service_pb2.WriteBlobResponse()
55 changes: 55 additions & 0 deletions tensorboard/uploader/dry_run_stubs_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for dry-run rpc servicers."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorboard import test as tb_test
from tensorboard.uploader import dry_run_stubs
from tensorboard.uploader.proto import write_service_pb2


class DryRunTensorBoardWriterServicerTest(tb_test.TestCase):
def setUp(self):
super(DryRunTensorBoardWriterServicerTest, self).setUp()
self._stub = dry_run_stubs.DryRunTensorBoardWriterStub()

def testCreateExperiment(self):
self._stub.CreateExperiment(write_service_pb2.CreateExperimentRequest())

def testWriteScalar(self):
self._stub.WriteScalar(write_service_pb2.WriteScalarRequest())

def testWriteTensor(self):
self._stub.WriteTensor(write_service_pb2.WriteTensorRequest())

def testGetOrCreateBlobSequence(self):
self._stub.GetOrCreateBlobSequence(
write_service_pb2.GetOrCreateBlobSequenceRequest()
)

def testWriteBlob(self):
def dummy_iterator():
yield write_service_pb2.WriteBlobRequest()
yield write_service_pb2.WriteBlobRequest()

for response in self._stub.WriteBlob(dummy_iterator()):
self.assertTrue(response)


if __name__ == "__main__":
tb_test.main()
15 changes: 15 additions & 0 deletions tensorboard/uploader/flags_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ def define_flags(parser):
"0: no statistics printed during uploading. 1 (default): print data "
"statistics as data is uploaded.",
)
upload.add_argument(
"--dry_run",
action="store_true",
help="Perform a dry run of uploading. In a dry run, the data is read "
"from the logdir as pointed to by the --logdir flag and statistics are "
"displayed (if --verbose is not 0), but no data is actually uploaded "
"to the server.",
)
upload.add_argument(
"--one_shot",
action="store_true",
help="Upload only the existing data in the logdir and then exit "
"immediately, instead of continuing to listen for new data in the "
"logdir.",
)
upload.add_argument(
"--plugins",
type=lambda option: option.split(","),
Expand Down
39 changes: 38 additions & 1 deletion tensorboard/uploader/upload_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from __future__ import division
from __future__ import print_function

from absl import logging

import contextlib
from datetime import datetime
import sys
Expand Down Expand Up @@ -156,6 +158,21 @@ def blob_bytes_skipped(self):
def plugin_names(self):
return self._plugin_names

def has_data(self):
"""Has any data been tracked by this instance.

This counts the tensor and blob data that have been scanned
but skipped.

Returns:
Whether this stats tracking object has tracked any data.
"""
return (
self._num_scalars > 0
or self._num_tensors > 0
or self._num_blobs > 0
)

def summarize(self):
"""Get a summary string for actually-uploaded and skipped data.

Expand Down Expand Up @@ -255,6 +272,7 @@ def __init__(self, verbosity):
)
self._verbosity = verbosity
self._stats = UploadStats()
self._send_count = 0

def _dummy_generator(self):
while True:
Expand All @@ -264,14 +282,30 @@ def _dummy_generator(self):
def _update_uploading_status(self, message, color_code=_STYLE_GREEN):
if not self._verbosity:
return

message += "." * 3
sys.stdout.write(
_STYLE_ERASE_LINE + color_code + message + _STYLE_RESET + "\r"
)
sys.stdout.flush()

def _upload_start(self):
"""Write an update indicating the start of the uploading."""
if not self._verbosity:
return
start_message = "%s[%s]%s Uploader started.\n" % (
_STYLE_BOLD,
readable_time_string(),
_STYLE_RESET,
)
sys.stdout.write(start_message)
sys.stdout.flush()

def has_data(self):
"""Determine if any data has been uploaded under the tracker's watch."""
return self._stats.has_data()

def _update_cumulative_status(self):
"""Write an update summarizing the data uploaded since the start."""
if not self._verbosity:
return
if not self._stats.has_new_data_since_last_summarize():
Expand Down Expand Up @@ -299,6 +333,9 @@ def add_plugin_name(self, plugin_name):
@contextlib.contextmanager
def send_tracker(self):
"""Create a context manager for a round of data sending."""
self._send_count += 1
if self._send_count == 1:
self._upload_start()
try:
# self._reset_bars()
self._update_uploading_status("Data upload starting")
Expand Down
Loading