diff --git a/data/queries_test.json b/data/queries_test.json index dbac1cf..c5b04da 100644 --- a/data/queries_test.json +++ b/data/queries_test.json @@ -1,4 +1,11 @@ [ "2 tone", - "2-step garage" + "2-step garage", + "4-beat", + "A cappella", + "A-F", + "Acid house", + "Acid jazz", + "Acid rock", + "Acoustic" ] \ No newline at end of file diff --git a/lib/index.js b/lib/index.js index 48a60d3..7505728 100644 --- a/lib/index.js +++ b/lib/index.js @@ -12,10 +12,17 @@ var remaining_thread_count = Number(process.env.THREAD_COUNT) || 150; const tasks = {reflexGroups, reflexUsers, reflexTracks}; export async function init() { + console.log('starting on ' + new Date()); + await cypher('CREATE CONSTRAINT ON (group:VK_Group) ASSERT group.id IS UNIQUE'); + await cypher('CREATE CONSTRAINT ON (post:VK_Wall_Post) ASSERT post.id IS UNIQUE'); + await cypher('CREATE CONSTRAINT ON (track:VK_Track) ASSERT track.id IS UNIQUE'); + await cypher('CREATE CONSTRAINT ON (user:VK_User) ASSERT user.id IS UNIQUE'); + await cypher('CREATE CONSTRAINT ON (artist:Artist) ASSERT artist.id IS UNIQUE'); for (let query of queries) await addTask(['reflexGroups', query]); while (remaining_thread_count--) - await start_thread(); + /*important! not an async call, that's an fibering*/ + start_thread(); } //noinspection InfiniteRecursionJS @@ -23,8 +30,10 @@ async function start_thread() { try { await (await getNextTask()); } catch (e) { - console.log(e, e.stack); - process.exit(1); + console.log(e); + console.log(e.stack); + process.nextTick(()=> process.exit(1)); + throw e; } finally { await start_thread() } diff --git a/lib/reflectors/groups_reflection.js b/lib/reflectors/groups_reflection.js index 087114f..364f23a 100644 --- a/lib/reflectors/groups_reflection.js +++ b/lib/reflectors/groups_reflection.js @@ -5,7 +5,6 @@ import callVK from '../utils/VK_stack_caller.js'; //noinspection JSUnusedGlobalSymbols export default async function reflexGroupsFromQuery(query) { let {items: groups} = await callVK('groups.search', {q: query, type: 'page', count: 1000}); - console.log(groups); if (Array.isArray(groups)) await cypher( `FOREACH (id in {ids} | MERGE (group:VK_Group {id: id}))`, diff --git a/lib/reflectors/tracks_reflection.js b/lib/reflectors/tracks_reflection.js index d684b4a..95ffd8d 100644 --- a/lib/reflectors/tracks_reflection.js +++ b/lib/reflectors/tracks_reflection.js @@ -18,16 +18,23 @@ export default async function reflexTracksForGroup(group_id, remainingTrackCount remainingTrackCount -= tracks.length; remainingWallPostCount -= posts.length; await cypher(` - MATCH (group:VK_Group {id: ${group_id} }) - MERGE (post:VK_Wall_Post { id: ${post.id}, date: ${post.date} }) - CREATE UNIQUE (group)-[:Published]-(post) - FOREACH (track IN {tracks} | - MERGE (trackNode:VK_Track {id: track.id, title: track.title, duration: track.duration}) - MERGE (artist:Artist {name: track.artist}) - CREATE UNIQUE (post)-[:Attached]->(trackNode)<-[:Performed]-(artist) - ) - `, +MATCH (group:VK_Group {id: {group_id} }) +MERGE (post:VK_Wall_Post { id: {post_id}}) + ON CREATE SET post.date={post_date} + +MERGE (group)-[:Published]->(post) +FOREACH (track IN {tracks} | + MERGE (trackNode:VK_Track {id: track.id}) + ON CREATE SET trackNode.title=track.title, + trackNode.duration=track.duration + MERGE (artist:Artist {name: track.artist}) + MERGE (trackNode)<-[:Performed]-(artist) + MERGE (post)-[:Attached]->(trackNode) +)`, { + group_id, + post_id: post.id, + post_date: post.date, tracks: tracks.map(track => ({ id : track.id, title : track.title, diff --git a/lib/reflectors/users_reflection.js b/lib/reflectors/users_reflection.js index 9eefa59..062457f 100644 --- a/lib/reflectors/users_reflection.js +++ b/lib/reflectors/users_reflection.js @@ -9,12 +9,11 @@ export default async function reflexUsersForGroup(group_id) { do { var {items: users} = await callVK('groups.getMembers', {group_id: group_id, count, offset: cursor++ * count}); await cypher(` - MATCH (g:VK_Group { id: ${group_id} }) - FOREACH (id IN {users} | - MERGE (u:VK_User { id: id }) - CREATE UNIQUE (u)-[r:Member]->(g) - ) - `, - {users}); +MATCH (g:VK_Group { id: {group_id} }) +FOREACH (id IN {users} | + MERGE (u:VK_User { id: id }) + CREATE UNIQUE (u)-[:Member]->(g) +)`, + {group_id, users}); } while (users.length !== 0) } \ No newline at end of file diff --git a/lib/utils/VK_stack_caller.js b/lib/utils/VK_stack_caller.js index 7c6ca64..66efb4c 100644 --- a/lib/utils/VK_stack_caller.js +++ b/lib/utils/VK_stack_caller.js @@ -30,13 +30,15 @@ async function loop() { ${queries.map(q => `result.push(API.${q.method}(${JSON.stringify(q.args, replace_quotes)}));`).join('\n')} return result;`; call('execute', {v, code}) - .then(({execute_errors, error, response = []}) => + .then(({execute_errors, error, response = []}) => { + console.log(execute_errors, error, response.length, stack.length); stack.push(...queries.filter((query, index) => { if (response[index] || response[index] === '') query.resolve(response[index]); else return true; - }))); + })) + }); await new Promise(res => setTimeout(res, 450)); await loop(); diff --git a/lib/utils/graph_connection.js b/lib/utils/graph_connection.js index 2075c6d..2e1d172 100644 --- a/lib/utils/graph_connection.js +++ b/lib/utils/graph_connection.js @@ -10,6 +10,6 @@ export default function cypher(query, params) { }; -export function escape(string) { +export function escape(string: string) { return '"' + string.replace(/"/g, '\\"') + '"'; } \ No newline at end of file diff --git a/lib/utils/task_queue.js b/lib/utils/task_queue.js index 7020be5..e60b3c2 100644 --- a/lib/utils/task_queue.js +++ b/lib/utils/task_queue.js @@ -9,7 +9,7 @@ const db = new Redis(process.env.REDIS_HOST); const taskPresenceKey = 'graph_dump:tasks:presenceHash'; const taskOrderKey = 'graph_dump:tasks:presenceList'; -export async function addTask(taskData){ +export async function addTask(taskData: object){ if (taskData instanceof Object) { var taskString = JSON.stringify(taskData); var exists = await db.hsetnx(taskPresenceKey, taskString, 1);