init
This commit is contained in:
106
lib/modules/directPosts.js
Normal file
106
lib/modules/directPosts.js
Normal file
@@ -0,0 +1,106 @@
|
||||
const Factory = require('../factory')
|
||||
const transform = require('./transform')
|
||||
const Utils = require('./utils')
|
||||
|
||||
//
|
||||
// Initialize the child logger for
|
||||
// the module
|
||||
//
|
||||
const log = require('../log').child({
|
||||
module: 'directPosts'
|
||||
})
|
||||
|
||||
module.exports = function(context) {
|
||||
return new Promise(function(resolve /*, reject */) {
|
||||
log.info('streaming records')
|
||||
//
|
||||
// Keep track of the number of posts
|
||||
// written for logging
|
||||
//
|
||||
var written = 0
|
||||
//
|
||||
// Query messages from Jabber and pipe
|
||||
// through the post transform and
|
||||
// then to the output. We use pipe to
|
||||
// handle very large data sets using
|
||||
// streams
|
||||
//
|
||||
context.jabber.pipe(
|
||||
//
|
||||
// Define the query
|
||||
//
|
||||
`SELECT to_jid, from_jid, sent_date, body_string, body_text FROM dbo.jm
|
||||
WHERE msg_type = 'c'
|
||||
AND direction = 'I'
|
||||
AND (body_string != '' or datalength(body_text) > 0)`,
|
||||
//
|
||||
// Define the tranform
|
||||
//
|
||||
transform(function(message, encoding, callback) {
|
||||
try {
|
||||
log.debug(message)
|
||||
//
|
||||
// Generate the members array for the
|
||||
// direct channel
|
||||
//
|
||||
let members = Utils.members(
|
||||
context.values.users,
|
||||
message.to_jid,
|
||||
message.from_jid
|
||||
)
|
||||
//
|
||||
// Ensure we have at least two channel
|
||||
// members before we can write the message
|
||||
//
|
||||
if (Utils.membersAreValid(members)) {
|
||||
//
|
||||
// Base post props
|
||||
//
|
||||
var post = {
|
||||
channel_members: members,
|
||||
user: Utils.username(context.values.users, message.from_jid),
|
||||
create_at: Utils.millis(message.sent_date)
|
||||
}
|
||||
//
|
||||
// Process each chunk
|
||||
//
|
||||
Utils.body(message).forEach(function(chunk) {
|
||||
context.output.write(
|
||||
Factory.directPost(Object.assign({}, post, {
|
||||
message: chunk
|
||||
}))
|
||||
)
|
||||
//
|
||||
// Log progress periodically
|
||||
//
|
||||
written += 1
|
||||
if(written % 1000 == 0) {
|
||||
log.info(`... wrote ${written} posts`)
|
||||
}
|
||||
})
|
||||
} else {
|
||||
log.warn({
|
||||
to_jid: message.to_jid,
|
||||
from_jid: message.from_jid
|
||||
}, '... skipping message with invalid channel members')
|
||||
}
|
||||
} catch (err) {
|
||||
log.error(`... ignoring directPost from: ${message.from_jid} to: ${message.to_jid} on error: ${err.message}.`)
|
||||
}
|
||||
//
|
||||
// Invoke the call to mark that we are
|
||||
// done with the chunk
|
||||
//
|
||||
return callback()
|
||||
},
|
||||
//
|
||||
// Define the callback to be invoked
|
||||
// on finish
|
||||
//
|
||||
function() {
|
||||
log.info(`... finished writing ${written} posts`)
|
||||
resolve(context)
|
||||
})
|
||||
)
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user