Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/279 code upload #545

Merged
merged 7 commits into from
Feb 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
v 2018.2 (2018-04-xx)
Change in this release:
- The internal storage format for code is optimized. This introduces API and schema changes.
This release supports both storage versions. The old version will be removed in the next release.

v 2018.1 (2018-02-09)
Changes in this release:
- Various bugfixes and performance enhancements
Expand Down
5 changes: 5 additions & 0 deletions src/inmanta/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ def get_default_value(self):
else:
return defa

def set(self, value):
""" Only for tests"""
cfg = Config._get_instance()
cfg.set(self.section, self.name, value)

#############################
# Config
#
Expand Down
12 changes: 11 additions & 1 deletion src/inmanta/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1484,13 +1484,17 @@ class Code(BaseDocument):

:param environment The environment this code belongs to
:param version The version of configuration model it belongs to
:param sources The source code of plugins
:param sources The source code of plugins (phasing out) form:
{code_hash:(file_name, provider.__module__, source_code, [req])}
:param requires Python requires for the source code above
:param source_refs file hashes refering to files in the file store
{code_hash:(file_name, provider.__module__, [req])}
"""
environment = Field(field_type=uuid.UUID, required=True)
resource = Field(field_type=str, required=True)
version = Field(field_type=int, required=True)
sources = Field(field_type=dict)
source_refs = Field(field_type=dict)

__indexes__ = [
dict(keys=[("environment", pymongo.ASCENDING), ("version", pymongo.ASCENDING), ("resource", pymongo.ASCENDING)])
Expand All @@ -1505,6 +1509,12 @@ def get_version(cls, environment, version, resource):

return codes[0]

@classmethod
@gen.coroutine
def get_versions(cls, environment, version):
codes = yield cls.get_list(environment=environment, version=version)
return codes


class DryRun(BaseDocument):
"""
Expand Down
41 changes: 26 additions & 15 deletions src/inmanta/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
Contact: [email protected]
"""

import hashlib
import logging
import os
import time
Expand All @@ -32,6 +31,7 @@
from inmanta.ast import RuntimeException, CompilerException
from tornado.ioloop import IOLoop
from tornado import gen
from inmanta.util import hash_file

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -60,6 +60,29 @@ def __str__(self, *args, **kwargs):
return "Cycle in dependencies: %s" % self.cycle


@gen.coroutine
def upload_code(conn, tid, version, resource_to_sourcemap):
allfiles = {myhash: source_code for sourcemap in resource_to_sourcemap.values()
for myhash, (file_name, module, source_code, req) in sourcemap.items()}

res = yield conn.stat_files(list(allfiles.keys()))
if res is None or res.code != 200:
raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result)

for file in res.result["files"]:
res = yield conn.upload_file(id=file, content=base64.b64encode(allfiles[file].encode()).decode("ascii"))
if res is None or res.code != 200:
raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result)

compactmap = {resource: {myhash: (file_name, module, req) for
myhash, (file_name, module, source_code, req)in sourcemap.items()}
for resource, sourcemap in resource_to_sourcemap.items()}

res = yield conn.upload_code_batched(tid=tid, id=version, resources=compactmap)
if res is None or res.code != 200:
raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result)


class Exporter(object):
"""
This class handles exporting the compiled configuration model
Expand Down Expand Up @@ -354,10 +377,7 @@ def merge_dict(a, b):

@gen.coroutine
def call():
for myresource, mysources in sources.items():
res = yield conn.upload_code(tid=tid, id=version, resource=myresource, sources=mysources)
if res is None or res.code != 200:
raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result)
yield upload_code(conn, tid, version, sources)

self.run_sync(call)

Expand Down Expand Up @@ -431,15 +451,6 @@ def get_unknown_resources(self, hostname):

return set()

def _hash_file(self, content):
"""
Create a hash from the given content
"""
sha1sum = hashlib.new("sha1")
sha1sum.update(content)

return sha1sum.hexdigest()

def upload_file(self, content=None):
"""
Upload a file to the configuration server. This operation is not
Expand All @@ -448,7 +459,7 @@ def upload_file(self, content=None):
if not isinstance(content, bytes):
content = content.encode('utf-8')

hash_id = self._hash_file(content)
hash_id = hash_file(content)
self._file_store[hash_id] = content

return hash_id
Expand Down
18 changes: 18 additions & 0 deletions src/inmanta/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ def upload_code(self, tid: uuid.UUID, id: int, resource: str, sources: dict):
:param tid: The environment the code belongs to
:param id: The id (version) of the configuration model
:param sources: The source files that contain handlers and inmanta plug-ins
{code_hash:(file_name, provider.__module__, source_code, [req])}
"""

@protocol(operation="GET", id=True, agent_server=True, arg_options=ENV_OPTS, client_types=["agent"])
Expand All @@ -790,6 +791,23 @@ def get_code(self, tid: uuid.UUID, id: int, resource: str):
"""


class CodeBatchedMethod(Method):
"""
Upload code to the server
"""
__method_name__ = "codebatched"

@protocol(operation="PUT", id=True, arg_options=ENV_OPTS, client_types=["compiler"])
def upload_code_batched(self, tid: uuid.UUID, id: int, resources: dict):
"""
Upload the supporting code to the server

:param tid: The environment the code belongs to
:param id: The id (version) of the configuration model
:param resource: a dict mapping resources to dicts mapping file names to file hashes
"""


class FileDiff(Method):
"""
Generate download the diff of two hashes
Expand Down
6 changes: 6 additions & 0 deletions src/inmanta/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ def validate_fact_renew(value):
agent_timeout = Option("server", "agent-timeout", 30,
"Time before an agent is considered to be offline", is_time)

server_delete_currupt_files = Option("server", "delete_currupt_files", True,
"The server logs an error when it detects a file got corrupted. When set to true, the "
"server will also delete the file, so on subsequent compiles the missing file will be "
"recreated.", is_bool)

#############################
# Dashboard
#############################
Expand All @@ -116,5 +121,6 @@ def default_hangtime():
""" server.agent-timeout*3/4 """
return str(int(agent_timeout.get() * 3 / 4))


agent_hangtime = Option("server", "agent-hold", default_hangtime,
"Maximal time the server will hold an agent heartbeat call", is_time)
122 changes: 117 additions & 5 deletions src/inmanta/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from inmanta.server import config as opt
from inmanta.server.agentmanager import AgentManager
import json

from inmanta.util import hash_file

LOGGER = logging.getLogger(__name__)
agent_lock = locks.Lock()
Expand Down Expand Up @@ -529,13 +529,20 @@ def delete_record(self, env, record_id):
@protocol.handle(methods.FileMethod.upload_file, file_hash="id")
@gen.coroutine
def upload_file(self, file_hash, content):
content = base64.b64decode(content)
return self.upload_file_internal(file_hash, content)

def upload_file_internal(self, file_hash, content):
file_name = os.path.join(self._server_storage["files"], file_hash)

if os.path.exists(file_name):
return 500, {"message": "A file with this id already exists."}

if hash_file(content) != file_hash:
return 400, {"message": "The hash does not match the content"}

with open(file_name, "wb+") as fd:
fd.write(base64.b64decode(content))
fd.write(content)

return 200

Expand All @@ -552,14 +559,43 @@ def stat_file(self, file_hash):
@protocol.handle(methods.FileMethod.get_file, file_hash="id")
@gen.coroutine
def get_file(self, file_hash):
ret, c = self.get_file_internal(file_hash)
if ret == 200:
return 200, {"content": base64.b64encode(c).decode("ascii")}
else:
return ret, c

def get_file_internal(self, file_hash):
"""get_file, but on return code 200, content is not encoded """

file_name = os.path.join(self._server_storage["files"], file_hash)

if not os.path.exists(file_name):
return 404

else:
with open(file_name, "rb") as fd:
return 200, {"content": base64.b64encode(fd.read()).decode("ascii")}
content = fd.read()
actualhash = hash_file(content)
if actualhash != file_hash:
if opt.server_delete_currupt_files.get():
LOGGER.error("File corrupt, expected hash %s but found %s at %s, Deleting file" %
(file_hash, actualhash, file_name))
try:
os.remove(file_name)
except OSError:
LOGGER.exception("Failed to delete file %s" % (file_name))
return 500, {"message": ("File corrupt, expected hash %s but found %s,"
" Failed to delete file, please contact the server administrator"
) % (file_hash, actualhash)}
return 500, {"message": ("File corrupt, expected hash %s but found %s, "
"Deleting file, please re-upload the corrupt file"
) % (file_hash, actualhash)}
else:
LOGGER.error("File corrupt, expected hash %s but found %s at %s" % (file_hash, actualhash, file_name))
return 500, {"message": ("File corrupt, expected hash %s but found %s,"
" please contact the server administrator") % (file_hash, actualhash)}
return 200, content

@protocol.handle(methods.FileMethod.stat_files)
@gen.coroutine
Expand Down Expand Up @@ -1001,19 +1037,95 @@ def upload_code(self, env, code_id, resource, sources):
if code is not None:
return 500, {"message": "Code for this version has already been uploaded."}

code = data.Code(environment=env.id, version=code_id, resource=resource, sources=sources)
hasherrors = any((k != hash_file(content[2].encode()) for k, content in sources.items()))
if hasherrors:
return 400, {"message": "Hashes in source map do not match to source_code"}

ret, to_upload = yield self.stat_files(sources.keys())

if ret != 200:
return ret, to_upload

for file_hash in to_upload["files"]:
ret = self.upload_file_internal(file_hash, sources[file_hash][2].encode())
if ret != 200:
return ret

compact = {code_hash: (file_name, module, req) for code_hash, (file_name, module, _, req) in sources.items()}

code = data.Code(environment=env.id, version=code_id, resource=resource, source_refs=compact, sources={})
yield code.insert()

return 200

@protocol.handle(methods.CodeBatchedMethod.upload_code_batched, code_id="id", env="tid")
@gen.coroutine
def upload_code_batched(self, env, code_id, resources):
# validate
for rtype, sources in resources.items():
if not isinstance(rtype, str):
return 400, {"message": "all keys in the resources map must be strings"}
if not isinstance(sources, dict):
return 400, {"message": "all values in the resources map must be dicts"}
for name, refs in sources.items():
if not isinstance(name, str):
return 400, {"message": "all keys in the sources map must be strings"}
if not isinstance(refs, (list, tuple)):
return 400, {"message": "all values in the sources map must be lists or tuple"}
if len(refs) != 3 or\
not isinstance(refs[0], str) or \
not isinstance(refs[1], str) or \
not isinstance(refs[2], list):
return 400, {"message": "The values in the source map should be of the"
" form (filename, module, [requirements])"}

allrefs = [ref for sourcemap in resources.values() for ref in sourcemap.keys()]

ret, val = yield self.stat_files(allrefs)

if ret != 200:
return ret, val

if len(val["files"]) != 0:
return 400, {"message": "Not all file references provided are valid", "references": val["files"]}

code = yield data.Code.get_versions(environment=env.id, version=code_id)
oldmap = {c.resource: c for c in code}

new = {k: v for k, v in resources.items() if k not in oldmap}
conflict = [k for k, v in resources.items() if k in oldmap and oldmap[k].source_refs != v]

if len(conflict) > 0:
return 500, {"message": "Some of these items already exists, but with different source files",
"references": conflict}

newcodes = [data.Code(environment=env.id, version=code_id, resource=resource, source_refs=hashes)
for resource, hashes in new.items()]

yield data.Code.insert_many(newcodes)

return 200

@protocol.handle(methods.CodeMethod.get_code, code_id="id", env="tid")
@gen.coroutine
def get_code(self, env, code_id, resource):
code = yield data.Code.get_version(environment=env.id, version=code_id, resource=resource)
if code is None:
return 404, {"message": "The version of the code does not exist."}

return 200, {"version": code_id, "environment": env.id, "resource": resource, "sources": code.sources}
if code.sources is not None:
sources = dict(code.sources)
else:
sources = {}

if code.source_refs is not None:
for code_hash, (file_name, module, req) in code.source_refs.items():
ret, c = self.get_file_internal(code_hash)
if ret != 200:
return ret, c
sources[code_hash] = (file_name, module, c.decode(), req)

return 200, {"version": code_id, "environment": env.id, "resource": resource, "sources": sources}

@protocol.handle(methods.ResourceMethod.resource_action_update, env="tid")
@gen.coroutine
Expand Down
11 changes: 11 additions & 0 deletions src/inmanta/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from pkg_resources import DistributionNotFound
import pkg_resources
import hashlib


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -48,3 +49,13 @@ def get_compiler_version():
"Could not find version number for the inmanta compiler." +
"Is inmanta installed? Use stuptools install or setuptools dev to install.")
return None


def hash_file(content):
"""
Create a hash from the given content
"""
sha1sum = hashlib.new("sha1")
sha1sum.update(content)

return sha1sum.hexdigest()
Loading