IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]MetaQ集群安装测试

    liuzhoulong发表于 2014-02-26 12:43:20
    love 0

    1,ZooKeeper集群安装,自己搜索安装

    2,下载https://github.com/killme2008/Metamorphosis/tree/metamorphosis-all-1.4.6.2,如果不想自己编译可以直接下载http://fnil.net/downloads/index.html,我这里选择自己编译,主要是以后如果出现问题自己可以修改其源码,重新编译

    3,maven编译,maven环境自己搜索配置好,下载all项目后需要编译其子项目metamorphosis-server-wrapper。dos环境进入其目录下mvn eclipse:eclipse,完成后导入到eclipse,用eclipse插件编译。或者直接dos该目录下执行mvn clean install -Dmaven.test.skip=true。完成后target目录下生产其jar包;

    可以在工程创建lib文件夹,输入以下命令:mvn dependency:copy-dependencies -DoutputDirectory=lib (不加DoutputDirectory会默认输出到targed/dependency下)。再把install的jar包也copy到lib下。

    4,完成编译后上传到服务器

    需要修改conf/server.ini文件

    [system]brokerId=2

    numPartitions=1

    serverPort=8123

    ashboardHttpPort=8120

    unflushThreshold=0

    unflushInterval=10000

    maxSegmentSize=1073741824

    maxTransferSize=1048576

    deletePolicy=delete,168

    deleteWhen=0 0 6,18 * * ?

    flushTxLogAtCommit=1

    stat=true

    dataPath=/data1/metaq/data

    dataLogPath=/data1/metaq/log

    [zookeeper]

    zk.zkConnect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181

    zk.zkSessionTimeoutMs=30000

    zk.zkConnectionTimeoutMs=30000

    zk.zkSyncTimeMs=5000

    ;; Topics section

    [topic=test]

    [topic=meta-test]



    集群的话需要修改上面标红部分,brokerId保证每个服务器节点上不一样就行

    dataPath,dataLogPath如果自己制定,需要每台服务器mkdir

    分发到个节点,在每台节点的bin下都执行metaServer.sh start

    需要停止时执行metaServer.sh stop

    查看状态sh metaServer.sh status

    5,应用例子

    package com.test.metaq;
    
    import java.util.concurrent.Executor;
    
    import com.taobao.metamorphosis.Message;
    import com.taobao.metamorphosis.client.MessageSessionFactory;
    import com.taobao.metamorphosis.client.MetaClientConfig;
    import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
    import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
    import com.taobao.metamorphosis.client.consumer.MessageConsumer;
    import com.taobao.metamorphosis.client.consumer.MessageListener;
    import com.taobao.metamorphosis.exception.MetaClientException;
    import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
    
    public class AsyncConsum {
    
    	public static void main(String[] args) {
    		final MetaClientConfig metaClientConfig = new MetaClientConfig();  
            final ZKConfig zkConfig = new ZKConfig();  
            zkConfig.zkConnect = "192.168.1.1:2181";  
            metaClientConfig.setZkConfig(zkConfig);  
            MessageSessionFactory sessionFactory = null;
    		try {
    			sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
    		} catch (MetaClientException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}  
            final String topic = "test";  
            final String group = "meta-example";  
            MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));  
           
            try {
    			consumer.subscribe(topic, 1024 * 1024, new MessageListener() {  
    			    public void recieveMessages(Message message) {  
    			        System.out.println("Receive message " + new String(message.getData()));  
    			    }  
    			    public Executor getExecutor() {  
    			        return null;  
    			    }  
    			});
    			consumer.completeSubscribe();
    		} catch (MetaClientException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}  
    	}
    
    }
    package com.test.metaq;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    
    import com.taobao.metamorphosis.Message;
    import com.taobao.metamorphosis.client.MessageSessionFactory;
    import com.taobao.metamorphosis.client.MetaClientConfig;
    import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
    import com.taobao.metamorphosis.client.producer.MessageProducer;
    import com.taobao.metamorphosis.client.producer.SendResult;
    import com.taobao.metamorphosis.exception.MetaClientException;
    import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
    
    public class Products {
    
    	public static void main(String[] args) {
    		final MetaClientConfig metaClientConfig = new MetaClientConfig();
    		final ZKConfig zkConfig = new ZKConfig();
    		zkConfig.zkConnect = "192.168.1.1:2181";
    		metaClientConfig.setZkConfig(zkConfig);
    		
    		MessageSessionFactory sessionFactory = null;
    		try {
    			sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
    		} catch (MetaClientException e) {
    			e.printStackTrace();
    		}
    		
    		MessageProducer producer = sessionFactory.createProducer();
    		final String topic = "test";
    		producer.publish(topic);
    		BufferedReader reader = new BufferedReader(new InputStreamReader(
    				System.in));
    		String line = "qiujinyong";
    		try {
    			while ((line = reader.readLine()) != null) {
    				SendResult sendResult = producer.sendMessage(new Message(topic,
    						line.getBytes()));
    				if (!sendResult.isSuccess()) {
    					System.err.println("Send message failed,error message:"
    							+ sendResult.getErrorMessage());
    				} else {
    					System.out.println("Send message successfully,sent to "
    							+ sendResult.getPartition());
    				}
    			}
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} catch (MetaClientException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    	}
    
    }


    打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.Products 命令行输入message

    打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.AsyncConsum 命令行会接收到message

    
    
    
                        
                    


沪ICP备19023445号-2号
友情链接