Skip to content

Commit

Permalink
feat: send hash to datalayer get_keys_values
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Feb 21, 2022
1 parent 0ac739f commit 1d02610
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 21 deletions.
14 changes: 10 additions & 4 deletions src/datalayer/persistance.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,19 @@ export const getRoot = async (storeId, ignoreEmptyStore = false) => {
}
};

export const getStoreData = async (storeId) => {
export const getStoreData = async (storeId, rootHash) => {
if (storeId) {
const payload = {
id: storeId,
};

if (rootHash) {
payload.root_hash = rootHash;
}

const options = {
url: `${rpcUrl}/get_keys_values`,
body: JSON.stringify({
id: storeId,
}),
body: JSON.stringify(payload),
};

const response = await request(
Expand Down
44 changes: 29 additions & 15 deletions src/datalayer/syncService.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ const frames = ['-', '\\', '|', '/'];

console.log('Start Datalayer Update Polling');
const startDataLayerUpdatePolling = async () => {
const storeIdsToUpdate = await dataLayerWasUpdated();
if (storeIdsToUpdate.length) {
const updateStoreInfo = await dataLayerWasUpdated();
if (updateStoreInfo.length) {
await Promise.all(
storeIdsToUpdate.map(async (storeId) => {
updateStoreInfo.map(async (store) => {
logUpdate(
`Updates found syncing storeId: ${storeId} ${
`Updates found syncing storeId: ${store.storeId} ${
frames[Math.floor(Math.random() * 3)]
}`,
);
await syncDataLayerStoreToClimateWarehouse(storeId);
await syncDataLayerStoreToClimateWarehouse(
store.storeId,
store.rootHash,
);
}),
);
} else {
Expand All @@ -33,15 +36,24 @@ const startDataLayerUpdatePolling = async () => {
setTimeout(() => startDataLayerUpdatePolling(), POLLING_INTERVAL);
};

const syncDataLayerStoreToClimateWarehouse = async (storeId) => {
const syncDataLayerStoreToClimateWarehouse = async (storeId, rootHash) => {
let storeData;

if (process.env.USE_SIMULATOR === 'true') {
storeData = await simulator.getStoreData(storeId);
storeData = await simulator.getStoreData(storeId, rootHash);
} else {
storeData = await dataLayer.getStoreData(storeId);
storeData = await dataLayer.getStoreData(storeId, rootHash);
}

if (!_.get(storeData, 'keys_values', []).length) {
return;
}

await Organization.update(
{ registryHash: rootHash },
{ where: { registryId: storeId } },
);

const organizationToTrucate = await Organization.findOne({
attributes: ['orgUid'],
where: { registryId: storeId },
Expand Down Expand Up @@ -86,7 +98,7 @@ const syncDataLayerStoreToClimateWarehouse = async (storeId) => {
await Staging.cleanUpCommitedAndInvalidRecords();
}
} catch (error) {
console.trace('ERROR DURING SYNC TRANSACTION', error);
console.trace('ERROR DURING SYNC TRANSACTION', error, '!!!', storeData);
}
};

Expand Down Expand Up @@ -136,21 +148,21 @@ const dataLayerWasUpdated = async () => {
return [];
}

const updatedStoreIds = await Promise.all(
const updateStoreInfo = await Promise.all(
updatedStores.map(async (rootHash) => {
const storeId = rootHash.id.replace('0x', '');

// update the organization with the new hash
await Organization.update(
/* await Organization.update(
{ registryHash: rootHash.hash },
{ where: { registryId: storeId } },
);
);*/

return storeId;
return { storeId, rootHash: rootHash.hash };
}),
);

return updatedStoreIds;
return updateStoreInfo;
};

const subscribeToStoreOnDataLayer = async (storeId, ip, port) => {
Expand Down Expand Up @@ -200,7 +212,9 @@ const getSubscribedStoreData = async (
encodedData = await dataLayer.getStoreData(storeId);
}

if (!encodedData) {
console.log('!!!!', encodedData?.keys_values);

if (_.isEmpty(encodedData?.keys_values)) {
console.log(`Retrying...`, retry + 1);
console.log('...');
await new Promise((resolve) => setTimeout(() => resolve(), 30000));
Expand Down
12 changes: 11 additions & 1 deletion src/models/organizations/organizations.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ class Organization extends Model {
process.env.USE_SIMULATOR === 'true'
? 'f1c54511-865e-4611-976c-7c3c1f704662'
: await datalayer.createDataLayerStore();

console.log({
newOrganizationId,
});
const newRegistryId = await datalayer.createDataLayerStore();
console.log({
newRegistryId,
});
const registryVersionId = await datalayer.createDataLayerStore();
console.log({
registryVersionId,
});

// sync the organization store
await datalayer.syncDataLayer(newOrganizationId, {
Expand Down Expand Up @@ -88,6 +96,8 @@ class Organization extends Model {
try {
const orgData = await datalayer.getSubscribedStoreData(orgUid, ip, port);

console.log(orgData);

if (!orgData.registryId) {
throw new Error(
'Currupted organization, no registryId on the datalayer, can not import',
Expand Down
2 changes: 1 addition & 1 deletion src/utils/data-loaders.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const serverAvailable = async (server, port) => {
return true;
} catch (err) {
if (JSON.stringify(err).includes('Python')) {
console.log('SERVER IS AVAILABLE');
console.log('SERVER IS AVAILABLE', server);
return true;
} else {
return false;
Expand Down
1 change: 1 addition & 0 deletions src/utils/datalayer-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const decodeHex = (str) => {
};

export const decodeDataLayerResponse = (data) => {
console.log(data);
return data.keys_values.map((item) => ({
key: decodeHex(item.key),
value: decodeHex(item.value),
Expand Down

0 comments on commit 1d02610

Please sign in to comment.