IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    spark-stream 访问 Redis

    梦见山发表于 2016-08-12 14:48:31
    love 0

    最近在spark-stream上写了一些流计算处理程序,程序架构如下

    clipboard.png

    程序运行在Spark-stream上,我的目标是kafka、Redis的参数都支持在启动时指定。

    在写代码时参考了这篇文章 https://www.iteblog.com/archi...,该文讲的比较清楚,但是有两个问题:

    1. 用scala实现的

    2. Redis服务器的地址是写死的,我的程序要挪个位置,要重新改代码编译。

    当时倒腾了一些时间,现在写出来和大家分享,提高后来者的效率。

    clipboard.png

    如上图Spark是分布式引擎,Driver中创建的Redis Pool,在Worker上又得重新创建,参考文章中是定义一个Redis连接池管理类,Redis Pool是类的静态变量,类加载时由JVM自动创建。这个和我的预期有差距。

    在Driver中创建Redis管理对象,然后将该对象广播,然后在Worker上获取该广播对象,从而实现参数可变,但是Redis管理对象在每个Worker上又只实例化了一次。

    Driver

    Driver 指定序列化方式,Spark支持两种序列化方式,Java 和 Kryo,Kryo更高效。

    资料上说Kryo方式需要注册类,但是我没有注册也能成功运行。

    public static void main(String[] args) {
            if (args.length < 3) {
                System.err.println("Usage: kafka_spark_redis <brokers> <topics> <redisServer>\n" +
                        "  <brokers> Kafka broker列表\n" +
                        "  <topics> 要消费的topic列表\n" +
                        " <redisServer> redis 服务器地址 \n\n");
                System.exit(1);
            }
    
            /* 解析参数 */
            String brokers = args[0];
            String topics = args[1];
            String redisServer = args[2];
    
            // 创建stream context,两秒钟的数据算一批
            SparkConf sparkConf = new SparkConf().setAppName("kafka_spark_redis");
    //        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");//java的序列号速度没有Kryo速度快
            sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    //        sparkConf.set("spark.kryo.registrator", "MyRegistrator");
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
            JavaSparkContext sc = jssc.sparkContext();
    
            HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
            HashMap<String, String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("metadata.broker.list", brokers);
            kafkaParams.put("group.id","kakou-test");
    
            //Redis连接池管理类
            RedisClient redisClient = new RedisClient(redisServer);//创建redis连接池管理类
    
            //广播Reids连接池管理对象
            final Broadcast<RedisClient> broadcastRedis = sc.broadcast(redisClient);
    
            // 创建流处理对象
            JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                    jssc,
                    String.class,               /* kafka key class */
                    String.class,               /* kafka value class */
                    StringDecoder.class,        /* key 解码类 */
                    StringDecoder.class,        /* value 解码类 */
                    kafkaParams,                /* kafka 参数,如设置kafka broker */
                    topicsSet                   /* 待消费的topic名称 */
            );
    
            // 将行分拆为单词
            JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
                //@Override
                // kafka传来key-value对
                public String call(Tuple2<String, String> tuple2) {
    
                    // 取value值
                    return tuple2._2();
                }
            });
            /* 大量省略 */
            ........
        }

    RedisClient

    RedisClient 是自己实现的类,在类中重载write/read这两个序列化和反序列化函数,需要注意的是如果是Java Serializer 需要实现其它的接口。

    在Driver广播时会触发调用write序列化函数。

    public class RedisClient implements KryoSerializable {
        public static JedisPool jedisPool;
        public String host;
    
        public RedisClient(){
            Runtime.getRuntime().addShutdownHook(new CleanWorkThread());
        }
    
        public RedisClient(String host){
            this.host=host;
            Runtime.getRuntime().addShutdownHook(new CleanWorkThread());
            jedisPool = new JedisPool(new GenericObjectPoolConfig(), host);
        }
    
        static class CleanWorkThread extends Thread{
            @Override
            public void run() {
                System.out.println("Destroy jedis pool");
                if (null != jedisPool){
                    jedisPool.destroy();
                    jedisPool = null;
                }
            }
        }
    
        public Jedis getResource(){
            return jedisPool.getResource();
        }
    
        public void returnResource(Jedis jedis){
            jedisPool.returnResource(jedis);
        }
    
        public void write(Kryo kryo, Output output) {
            kryo.writeObject(output, host);
        }
    
        public void read(Kryo kryo, Input input) {
            host=kryo.readObject(input, String.class);
            this.jedisPool =new JedisPool(new GenericObjectPoolConfig(), host) ;
        }
    }

    Worker

    在foreachRDD中获取广播变量,由广播变量触发先调用RedisClient的无参反序列化函数,然后再调用反序列化函数,我们的做法是在反序列化函数中创建Redis Pool。

            //标准输出,对车辆的车牌和黑名单进行匹配,对与匹配成功的,保存到redis上。
            paircar.foreachRDD(new Function2<JavaRDD<HashMap<String, String>>, Time, Void>() {
                public Void call(JavaRDD<HashMap<String, String>> rdd, Time time) throws Exception {
                    Date now=new Date();
                    rdd.foreachPartition(new VoidFunction<Iterator<HashMap<String, String>>>() {
                        public void call(Iterator<HashMap<String, String>> it) throws Exception {
                            String tmp1;
                            String tmp2;
                            Date now=new Date();
                            RedisClient redisClient=broadcastRedis.getValue();
                            Jedis jedis=redisClient.getResource();
    
                            ......
    
                            redisClient.returnResource(jedis);
                        }
                    });

    结语

    Spark对分布式计算做了封装,但很多场景下还是要了解它的工作机制,很多问题和性能优化都和Spark的工作机制紧密相关。



沪ICP备19023445号-2号
友情链接