90 lines
2.3 KiB
JavaScript
90 lines
2.3 KiB
JavaScript
const Factory = require('../factory')
|
|
const transform = require('./transform')
|
|
const Utils = require('./utils')
|
|
|
|
//
|
|
// Initialize the child logger for
|
|
// the module
|
|
//
|
|
const log = require('../log').child({
|
|
module: 'directChannels'
|
|
})
|
|
|
|
module.exports = function(context) {
|
|
return new Promise(function(resolve /*, reject */) {
|
|
log.info('streaming records')
|
|
//
|
|
// Array to accumulate the channel
|
|
// member pairs
|
|
//
|
|
var channels = {}
|
|
//
|
|
// Query messages from Jabber and pipe
|
|
// through the post transform and
|
|
// then to the output. We use pipe to
|
|
// handle very large data sets using
|
|
// streams
|
|
//
|
|
context.jabber.pipe(
|
|
//
|
|
// Define the query
|
|
//
|
|
`SELECT DISTINCT to_jid, from_jid FROM dbo.jm
|
|
WHERE msg_type = 'c'
|
|
AND direction = 'I'
|
|
AND (body_string != '' or datalength(body_text) > 0)`,
|
|
//
|
|
// Define the tranform
|
|
//
|
|
transform(function(result, encoding, callback) {
|
|
try {
|
|
log.debug(result)
|
|
//
|
|
// Generate the members array for the
|
|
// direct channel
|
|
//
|
|
let members = Utils.members(
|
|
context.values.users,
|
|
result.to_jid,
|
|
result.from_jid
|
|
)
|
|
//
|
|
// If there at least two members
|
|
//
|
|
if(Utils.membersAreValid(members)) {
|
|
var key = `${members[0]}|${members[1]}`
|
|
//
|
|
// And we haven't processed this pair
|
|
// already
|
|
//
|
|
if(!channels[key]) {
|
|
channels[key] = members
|
|
log.info(`... writing ${members}`)
|
|
context.output.write(
|
|
Factory.directChannel({
|
|
members
|
|
})
|
|
)
|
|
}
|
|
}
|
|
} catch (err) {
|
|
log.error(`... ignoring directChannel from: ${result.from_jid} to: ${result.to_jid} on error: ${err.message}.`)
|
|
}
|
|
//
|
|
// Invoke the call to mark that we are
|
|
// done with the chunk
|
|
//
|
|
return callback()
|
|
},
|
|
//
|
|
// Define the callback to be invoked
|
|
// on finish
|
|
//
|
|
function() {
|
|
log.info('... finished')
|
|
resolve(context)
|
|
})
|
|
)
|
|
})
|
|
}
|