IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    有关event-stream中内存泄漏的问题

    xGreylee发表于 2017-02-07 03:59:16
    love 0

    最近被这个内存泄漏的问题折腾死了,根据GIT上面event-stream的文档稍微改动了一下代码。主要原因是存放cities的源文件信息量比较大,可读流中存放了大量的数据而用来写入数据的commit速度跟不上,导致内存爆满,希望能通过pause来解决,但是调试了很久发现貌似流一开始就是按时序执行的,pause并没有起作用,运行时占用内存还是持续飙升最后溢出然后程序终止。。不知道pause究竟要怎么用,还是node新手,求大神们指点一下!!谢谢

    var _ = require('underscore')
    var es = require('event-stream')
    var fs = require('fs')
    var events = require('events')
    var fibrous = require('fibrous')
    var config = require('../config.js')
    var db = require('../common/db.js')
    	// var mysql = require('./common/mysql.js')
    var distance = require('../common/distance.js')
    
    var emitter = new events.EventEmitter()
    var timestamp = Date.now()
    
    function log_and_time(str) {
    	var now = Date.now()
    	console.log(str, '[time used', now - timestamp, 'ms]')
    	timestamp = now
    }
    
    function start() {
    	console.log('init')
    	db.init('city')
    	setTimeout(function() {
    		// db.city.drop(function(err){
    		//      if(err) console.log(err)
    		db.city.ensureIndex({
    			"geo": "2d"
    		}, db.default_db_callback)
    		log_and_time('finished init')
    		emitter.emit('index_provinces')
    			// emitter.emit('statistic')
    			// })
    	}, 100)
    }
    start()
    
    emitter.once('index_provinces', function() {
    	var provinces = {}
    	console.log('index city names')
    	fs.createReadStream(config.raw.province, {
    		flags: 'r'
    	}).pipe(es.split()).pipe(es.through(function(line) {
    		var row = line.split('\t')
    		var city = {
    			code: row[0].split('.'),
    			name: row[1]
    		}
    		provinces[city.code[0]] = provinces[city.code[0]] || {}
    		provinces[city.code[0]][city.code[1]] = city.name
    	})).pipe(es.wait(function(err, text) {
    		log_and_time('finished indexing city names')
    		emitter.emit('index_city_locations', provinces)
    			//db.mongodb.close()
    	}))
    })
    
    
    
    emitter.once('index_city_locations', function(provinces) {
    	console.log('index city locations')
    	var ps = es.pause()
    	var cities = []
    	var line_number = 0
    	var count = 0
    		//this method is non-block
    	function commit(cities, callback) {
    		console.log('commiting', cities.length, 'docs')
    		db.city.insert(cities, callback || db.default_db_callback)
    	}
    	fs.createReadStream(config.raw.city, {
    		flags: 'r'
    	}).pipe(es.split()).pipe(es.through(function(line) {
    		var row = line.split('\t')
    		line_number++
    		var city = {}
    		city.name = row[1]
    		city.asciiname = row[2]
    		city.alias = row[3]
    		city.geo = {
    			lat: Number(row[4]),
    			lng: Number(row[5])
    		}
    		city.country_code = row[8]
    		city.province_code = row[10]
    		city.province_name = provinces[city.country_code] ? provinces[city.country_code][city.province_code] : undefined
    		if (city.province_name)
    			cities.push(city)
    		else {
    			console.log('warn province name not found', city.country_code, city.province_code, city.name, 'in line', line_number)
    				//console.log(row)
    		}
    		if (cities.length >= 20000) {
    			count++
    			ps.pause()
    			console.log('This is No.', count)
    			commit(cities, function() {
    				ps.resume()
    			})
    			cities = []
    		}
    	})).pipe(es.wait(function(err, text) {
    		if (cities.length) {
    
    			commit(cities, function(err) {
    				if (err) console.log(err)
    
    			})
    		} else {
    			// emitter.emit('exit')
    		}
    		log_and_time('finished indexing city locations.')
    	}))
    })
    
    
    
    // emitter.once('exit', function() {
    // 	console.log('exitting')
    // 	db.mongodb.close()
    // })
    
    emitter.once('exit', function() {
    	fibrous.run(function() {
    		console.log('exiting')
    			// var rs = mysql.connection.sync.query(config.mysql.sql)
    			// console.log('locations size',rs.length)
    			// var last_city
    			// for(var i in rs){
    			// 	var loc = rs[i]
    			// 	loc.latitude = loc.latitude/3600000
    			// 	loc.longitude = loc.longitude/3600000
    			// 	var near_city
    			// 	if(last_city){
    			// 		var dis = distance(loc.latitude,loc.longitude,last_city.geo.lat,last_city.geo.lng)
    			// 		if(dis>2){
    			// 			//console.log('distance from',last_city.province_name,'[',last_city.name,']','is',dis)
    			// 			near_city = null
    			// 		}
    			// 	}
    			// 	if(!near_city){
    			// 		near_city = db.city.sync.findOne({geo:{$near:[loc.latitude,loc.longitude]}},{_id:0,geo:1,province_name:1,name:1})
    			// 		console.log(loc.latitude,loc.longitude,near_city.province_name,'[',near_city.name,']')
    			// 	}
    			// 	last_city = near_city
    			// }
    		db.mongodb.close()
    			// mysql.connection.end()
    	})
    })
    

    补充一张异常图: QQ图片20170207115737.png



沪ICP备19023445号-2号
友情链接