Skip to content

Commit

Permalink
sipstore: create SIPs on record create/update
Browse files Browse the repository at this point in the history
Signed-off-by: Pamfilos Fokianos <[email protected]>
  • Loading branch information
pamfilos committed Mar 26, 2019
1 parent 4d9b3e9 commit 56fe4f5
Show file tree
Hide file tree
Showing 17 changed files with 744 additions and 40 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@ target/

# Jetbrains editor project folder
.idea/

# SIPStore
archive/
3 changes: 3 additions & 0 deletions cernopendata/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,6 @@
**params
)
]

SIPSTORE_CHECKSUM_ALGORITHM = 'adler32'

166 changes: 134 additions & 32 deletions cernopendata/modules/fixtures/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@
from invenio_pidstore.models import PersistentIdentifier
from invenio_records_files.api import Record
from invenio_records_files.models import RecordsBuckets
from invenio_sipstore.models import SIPMetadataType
from sqlalchemy.orm.attributes import flag_modified

from cernopendata.modules.records.minters.docid import \
cernopendata_docid_minter
from cernopendata.modules.records.minters.recid import \
cernopendata_recid_minter

from .sip_utils import (
handle_sipstore_record_file_index,
handle_sipstore_record_file,
sip_record,
)

def get_jsons_from_dir(dir):
"""Get JSON files inside a dir."""
Expand All @@ -55,8 +61,9 @@ def get_jsons_from_dir(dir):
return res


def handle_record_files(data, bucket, files, skip_files):
def handle_record_files(data, bucket, files, skip_files, skip_sips):
"""Handles record files."""
sip_files = []
for file in files:
if skip_files:
break
Expand Down Expand Up @@ -89,45 +96,80 @@ def handle_record_files(data, bucket, files, skip_files):
str(e)))
continue

if not skip_sips:
if file.get("type", None) == "index.json":
sip_files += handle_sipstore_record_file_index(f)

def create_record(schema, data, files, skip_files):
return sip_files


def handle_sip_files(files, skip_files, skip_sips):
"""Handles record files."""
sip_files = []
for file in files:
if skip_files:
break
assert 'uri' in file
assert 'size' in file
assert 'checksum' in file
f = FileInstance.get_by_uri(file.get("uri"))

if f and not skip_sips:
if file.get("type", None) == "index.json":
sip_files += handle_sipstore_record_file_index(f)

return sip_files


def create_record(schema, data, files, skip_files, skip_sips):
"""Creates a new record."""
id = uuid.uuid4()
cernopendata_recid_minter(id, data)
pid = cernopendata_recid_minter(id, data)

data['$schema'] = schema
record = Record.create(data, id_=id)
if not skip_files:
bucket = Bucket.create()
handle_record_files(data, bucket, files, skip_files)
sip_files_content = handle_record_files(
data, bucket, files, skip_files, skip_sips)

RecordsBuckets.create(
record=record.model, bucket=bucket)

return record
return pid, record, sip_files_content


def update_record(pid, schema, data, files, skip_files):
def update_record(pid, schema, data, files, skip_files, skip_sips):
"""Updates the given record."""
record = Record.get_record(pid.object_uuid)
with db.session.begin_nested():
if record.files and not skip_files:
bucket_id = record.files.bucket
bucket = Bucket.get(bucket_id.id)
for o in ObjectVersion.get_by_bucket(bucket).all():
o.remove()
o.file.delete()
RecordsBuckets.query.filter_by(
record=record.model,
bucket=bucket
).delete()
bucket_id.remove()
db.session.commit()
# with db.session.begin_nested():
# if record.files and not skip_files:
# bucket_id = record.files.bucket
# bucket = Bucket.get(bucket_id.id)
# for o in ObjectVersion.get_by_bucket(bucket).all():
# o.remove()
# o.file.delete()
# RecordsBuckets.query.filter_by(
# record=record.model,
# bucket=bucket
# ).delete()
# bucket_id.remove()
# db.session.commit()

record.update(data)
sip_files_content = []
if not skip_files:
bucket = Bucket.create()
handle_record_files(data, bucket, files, skip_files)
RecordsBuckets.create(
record=record.model, bucket=bucket)
return record
sip_files_content = handle_sip_files(
files,
skip_files,
skip_sips
)
# bucket = Bucket.create()
# sip_files_content = handle_record_files(
# data, bucket, files, skip_files, skip_sips)
# RecordsBuckets.create(
# record=record.model, bucket=bucket)
return record, sip_files_content


def create_doc(data, schema):
Expand Down Expand Up @@ -156,6 +198,8 @@ def fixtures():
@fixtures.command()
@click.option('--skip-files', is_flag=True, default=False,
help='Skip loading of files')
@click.option('--skip-sips', is_flag=True, default=False,
help='Skip create/update of SIPs')
@click.option('files', '--file', '-f', multiple=True,
type=click.Path(exists=True),
help='Path to the file(s) to be loaded. If not provided, all'
Expand All @@ -165,8 +209,9 @@ def fixtures():
@click.option('--mode', required=True, type=click.Choice(
['insert', 'replace', 'insert-or-replace']))
@with_appcontext
def records(skip_files, files, profile, mode):
def records(skip_files, skip_sips, files, profile, mode):
"""Load all records."""

if profile:
import cProfile
import pstats
Expand All @@ -187,31 +232,34 @@ def records(skip_files, files, profile, mode):
else:
record_json = glob.glob(os.path.join(data, '*.json'))


for filename in record_json:
# name = filename.split('/')[-1]
# if name.startswith('opera'):
# click.echo('Skipping opera records ...')
# continue

click.echo('Loading records from {0} ...'.format(filename))
with open(filename, 'rb') as source:
for data in json.load(source):

if not data:
click.echo('IGNORING a possibly broken or corrupted '
'record entry in file {0} ...'.format(filename))
continue

files = data.get('files', [])

pid = None
if mode == 'insert-or-replace':
try:
pid = PersistentIdentifier.get('recid', data['recid'])
if pid:
record = update_record(
pid, schema, data, files, skip_files)
record, sip_files_content = update_record(
pid, schema, data, files, skip_files, skip_sips)
action = 'updated'
except PIDDoesNotExistError:
record = create_record(schema, data, files, skip_files)
pid, record, sip_files_content = create_record(
schema, data, files, skip_files, skip_sips)
action = 'inserted'
elif mode == 'insert':
try:
Expand All @@ -223,7 +271,8 @@ def records(skip_files, files, profile, mode):
data.get('recid')), err=True)
return
except PIDDoesNotExistError:
record = create_record(schema, data, files, skip_files)
pid, record, sip_files_content = create_record(
schema, data, files, skip_files, skip_sips)
action = 'inserted'
else:
try:
Expand All @@ -234,13 +283,20 @@ def records(skip_files, files, profile, mode):
'cannot replace it.'.format(
data.get('recid')), err=True)
return
record = update_record(
pid, schema, data, files, skip_files)
record, sip_files_content = update_record(
pid, schema, data, files, skip_files, skip_sips)
action = 'updated'



if not skip_files:
record.files.flush()
record.commit()

if not skip_sips:
sip_record(pid, record, sip_files_content, action)
# sip_record(pid, record, ''.join(sip_files_content), action)

db.session.commit()
click.echo(
'Record recid {0} {1}.'.format(
Expand Down Expand Up @@ -462,3 +518,49 @@ def pids():
db.session.add(record)
db.session.commit()
db.session.expunge_all()



@fixtures.command()
@with_appcontext
def sipmetadata():
"""Load sipmetadata types."""
data = [
{
"title": "CERN Open Data Record JSON",
"name": "record-json",
"format": "json",
"schema": current_app.extensions['invenio-jsonschemas'] \
.path_to_url('records/record-v1.0.0.json')
},
{
"title": "CERN Open Data Docs JSON",
"name": "docs-json",
"format": "json",
"schema": current_app.extensions['invenio-jsonschemas'] \
.path_to_url('records/docs-v1.0.0.json')
},
{
"title": "CERN Open Data Glossary JSON",
"name": "glossary-json",
"format": "json",
"schema": current_app.extensions['invenio-jsonschemas'] \
.path_to_url('records/glossary-term-v1.0.0.json')
},
{
"title": "BagIt Archiver metadata",
"name": "bagit",
"format": "json",
"schema": current_app.extensions['invenio-jsonschemas'] \
.path_to_url('sipstore/bagit-v1.0.0.json')
}
]

click.secho('Loading SIP metadata types...', fg='blue')
with click.progressbar(data) as types:
with db.session.begin_nested():
for type in types:
db.session.add(SIPMetadataType(**type))
db.session.commit()
click.secho('SIP metadata types loaded!', fg='green')

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
[
{
"abstract": {
"description": "Sample event set from /BTag/Run2011A-12Oct2013-v1/AOD primary dataset in json format readable from the browser-based 3d event display"
},
"accelerator": "CERN-LHC",
"authors": [
{
"name": "McCauley, Thomas"
}
],
"collections": [
"CMS-Derived-Datasets"
],
"collision_information": {
"energy": "7TeV",
"type": "pp"
},
"date_created": [
"2011"
],
"date_published": "2016",
"distribution": {
"formats": [
"ig"
],
"number_events": 25
},
"doi": "10.7483/OPENDATA.CMS.BK3T.NKC6",
"experiment": "CMS",
"files": [
{
"checksum": "sha1:a100d93d3b27def7954fa5827e9717ab4ec407cc",
"size": 2411180,
"uri": "root://eospublic.cern.ch//eos/opendata/cms/Run2011A/BTag/IG/12Oct2013-v1/BTag.ig"
}
],
"methodology": {
"description": "These files contain the objects to be displayed with the online event display. No event selection, apart accepting only the validated runs, is applied. The software to produce these files is available in:",
"links": [
{
"recid": "550"
}
]
},
"note": {
"description": "No selection or quality criteria have been applied on the individual physics objects, apart from accepting only the validated runs"
},
"publisher": "CERN Open Data Portal",
"recid": "614",
"relations": [
{
"doi": "10.7483/OPENDATA.CMS.N372.QF6S",
"recid": "15",
"title": "/BTag/Run2011A-12Oct2013-v1/AOD",
"type": "isChildOf"
}
],
"run_period": [
"Run2011A"
],
"system_details": {
"global_tag": "FT_53_LV5_AN1::All",
"release": "CMSSW_5_3_30"
},
"title": "Event display file derived from /BTag/Run2011A-12Oct2013-v1/AOD",
"type": {
"primary": "Dataset",
"secondary": [
"Derived"
]
},
"usage": {
"description": "The data can be accessed from the file menu of the online event display",
"links": [
{
"description": "Explore and visualise events",
"url": "/visualise/events/CMS"
}
]
}
}
]
Loading

0 comments on commit 56fe4f5

Please sign in to comment.