Files
2026-05-13 20:03:28 +03:00

101 lines
2.5 KiB
JavaScript

const Factory = require('../factory')
const transform = require('./transform')
const Utils = require('./utils')
//
// Initialize the child logger for
// the module
//
const log = require('../log').child({
module: 'posts'
})
module.exports = function(context) {
return new Promise(function(resolve /*, reject */) {
log.info('streaming records')
//
// Keep track of the number of posts
// written for logging
//
var written = 0
//
// Query messages from Jabber and pipe
// through the post transform and
// then to the output. We use pipe to
// handle very large data sets using
// streams
//
context.jabber.pipe(
//
// Define the query
//
`SELECT
msg_id,
to_jid,
from_jid,
sent_date,
body_string,
body_text
FROM dbo.tc_msgarchive WHERE msg_type = 'g' AND (body_string != '' OR datalength(body_text) > 0) `,
//
// Define the tranform
//
transform(function(message, encoding, callback) {
try {
log.debug(message)
//
// Base post props
//
var post = {
team: context.values.team.name,
channel: Utils.channelName(
context.values.channels, message.to_jid
),
user: Utils.username(
context.values.users, message.from_jid
),
create_at: Utils.millis(message.sent_date)
}
//
// Process each chunk
//
Utils.body(message).forEach(function(chunk) {
//
// Write the post object to the
// output
//
context.output.write(
Factory.post(Object.assign({}, post, {
message: chunk
}))
)
//
// Log progress periodically
//
written += 1
if(written % 1000 == 0) {
log.info(`... wrote ${written} posts`)
}
})
}
catch(err) {
log.error(`... ignoring message id:${message.msg_id} on error: ${err.message}.`)
}
//
// Invoke the call to mark that we are
// done with the chunk
//
return callback()
},
//
// Define the callback to be invoked
// on finish
//
function() {
log.info(`... finished writing ${written} posts`)
resolve(context)
})
)
})
}