博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ZStack源码剖析之核心库鉴赏——EventFacade与CloudBus
阅读量:6068 次
发布时间:2019-06-20

本文共 52281 字,大约阅读时间需要 174 分钟。

本文首发于泊浮目的简书专栏:

前言

无论是事件和消息驱动,都是解耦的有力手段之一。ZStack作为一个大型软件项目,也使用了这些方案对整个架构进行了解耦。

EventFacade

EventFacade是一个很有意思的组件,因为它几乎是自举的。这就意味着有兴趣的朋友可以copy and paste,然后稍作修改就可以在自己的项目里工作起来了。

如何使用它

在ZStack的repo中,同样提供了相应的:

package org.zstack.test.core.cloudbus;/** * Created with IntelliJ IDEA. * User: frank * Time: 12:38 AM * To change this template use File | Settings | File Templates. */public class TestCanonicalEvent {    CLogger logger = Utils.getLogger(TestCanonicalEvent.class);    ComponentLoader loader;    EventFacade evtf;    boolean success;    @Before    public void setUp() throws Exception {        BeanConstructor con = new BeanConstructor();        loader = con.build();        evtf = loader.getComponent(EventFacade.class);        ((EventFacadeImpl) evtf).start();    }    @Test    public void test() throws InterruptedException {        String path = "/test/event";        evtf.on(path, new EventRunnable() {            @Override            public void run() {                success = true;            }        });        evtf.fire(path, null);        TimeUnit.SECONDS.sleep(1);        Assert.assertTrue(success);    }}

使用方法非常简单,先注册一个路径用于接收事件,然后沿着该路径发送一个事件,该事件注册的函数则会被调用。

接口定义

package org.zstack.core.cloudbus;import java.util.Map;/** * Created with IntelliJ IDEA. * User: frank * Time: 11:29 PM * To change this template use File | Settings | File Templates. */public interface EventFacade {    void on(String path, AutoOffEventCallback cb);    void on(String path, EventCallback cb);    void on(String path, EventRunnable runnable);    void off(AbstractEventFacadeCallback cb);    void onLocal(String path, AutoOffEventCallback cb);    void onLocal(String path, EventCallback cb);    void onLocal(String path, EventRunnable runnable);    void fire(String path, Object data);    boolean isFromThisManagementNode(Map tokens);    String META_DATA_MANAGEMENT_NODE_ID = "metadata::managementNodeId";    String META_DATA_PATH = "metadata::path";    String WEBHOOK_TYPE = "CanonicalEvent";}

源码解读

on

@Override    public void on(String path, AutoOffEventCallback cb) {        global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));    }    @Override    public void on(String path, final EventCallback cb) {        global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));    }    @Override    public void on(String path, EventRunnable cb) {        global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));    }

on方法仅仅是将一个属于EventRunnable 的uuid作为key,并将Callback作为value放入global这个map中。为什么要这么做呢?因为在Map的key是不可重复的,存path肯定是不妥的。

EventFacadeImpl的方法签名以及成员变量:

public class EventFacadeImpl implements EventFacade, CloudBusEventListener, Component, GlobalApiMessageInterceptor {    @Autowired    private CloudBus bus;    private final Map
global = Collections.synchronizedMap(new HashMap<>()); private final Map
local = new ConcurrentHashMap<>(); private EventSubscriberReceipt unsubscriber;

fire

相对的fire方法:

@Override    public void fire(String path, Object data) {        assert path != null;        CanonicalEvent evt = new CanonicalEvent();        evt.setPath(path);        evt.setManagementNodeId(Platform.getManagementServerId());        if (data != null) {            /*            if (!TypeUtils.isPrimitiveOrWrapper(data.getClass()) && !data.getClass().isAnnotationPresent(NeedJsonSchema.class)) {                throw new CloudRuntimeException(String.format("data[%s] passed to canonical event is not annotated by @NeedJsonSchema", data.getClass().getName()));            }            */            evt.setContent(data);        }        //从local这个map中找到对应的event并调用        fireLocal(evt);        //将事件发送给对应的webhook        callWebhooks(evt);        //通过cloudBus发送事件,关于cloudBus的源码之后会讲到        bus.publish(evt);    }

onLocal和on的区别

在上面的分析中并没有看到global的event是如何被触发的,如果想完全了解其中的过程,还得从CloudBus说起,我们稍后就会提到它。但是已经可以猜到为何要区分on和onLocal了。一个是通过消息总线触发,一个是在当前JVM进程内触发——这意味着一个支持ManagerNode集群事件,一个只支持单个MN事件。这也是来自于ZStack的业务场景——有些事情需要MN一起做,有些事情一个MN做了其他MN就不用做了。介于篇幅,有兴趣的读者可以自行翻看代码,这里不再详举。

WebHook

WebHook是ZStack向前端主动通信的手段之一。在注册了相应EventPath后,该path被调用后则会向相应的URL发送content。从case——CanonicalEventWebhookCaseWebhookCase可以看到它的正确使用姿势。

class CanonicalEventWebhookCase extends SubCase {    EnvSpec envSpec    @Override    void clean() {        envSpec.delete()    }    @Override    void setup() {        INCLUDE_CORE_SERVICES = false        spring {            include("webhook.xml")        }    }    String WEBHOOK_PATH = "/canonical-event-webhook"    void testErrorToCreateWebhookifOpaqueFieldMissing() {        expect(AssertionError.class) {            createWebhook {                name = "webhook1"                url = "http://127.0.0.1:8989$WEBHOOK_PATH"                type = EventFacade.WEBHOOK_TYPE            }        }    }    void testCanonicalEventWithVariableInPath() {        String path = "/test/{uuid}/event"        int count = 0        WebhookInventory hook1 = createWebhook {            name = "webhook1"            url = "http://127.0.0.1:8989$WEBHOOK_PATH"            type = EventFacade.WEBHOOK_TYPE            opaque = path        }        // this webhook will not be called because path unmatching        WebhookInventory hook2 = createWebhook {            name = "webhook1"            url = "http://127.0.0.1:8989$WEBHOOK_PATH"            type = EventFacade.WEBHOOK_TYPE            opaque = "/this-path-does-not-match"        }        CanonicalEvent evt        envSpec.simulator(WEBHOOK_PATH) { HttpEntity
e -> evt = json(e.getBody(), CanonicalEvent.class) count ++ return [:] } String content = "hello world" String eventPath = "/test/${Platform.uuid}/event" bean(EventFacade.class).fire(eventPath, content) retryInSecs { assert count == 1 assert evt != null assert evt.path == eventPath assert evt.content == content assert evt.managementNodeId == Platform.getManagementServerId() } } void testCanonicalEventUseWebhook() { String path = "/test/event" WebhookInventory hook1 = createWebhook { name = "webhook1" url = "http://127.0.0.1:8989$WEBHOOK_PATH" type = EventFacade.WEBHOOK_TYPE opaque = path } WebhookInventory hook2 = createWebhook { name = "webhook2" url = "http://127.0.0.1:8989$WEBHOOK_PATH" type = EventFacade.WEBHOOK_TYPE opaque = path } def testFireTwoEvents = { List
evts = [] envSpec.simulator(WEBHOOK_PATH) { HttpEntity
e -> CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class) evts.add(evt) return [:] } String content = "hello world" bean(EventFacade.class).fire(path, content) retryInSecs { assert evts.size() == 2 CanonicalEvent evt1 = evts[0] CanonicalEvent evt2 = evts[1] assert evt1.path == path assert evt1.content == content assert evt1.managementNodeId == Platform.getManagementServerId() assert evt2.path == path assert evt2.content == content assert evt2.managementNodeId == Platform.getManagementServerId() } } def testOneEventsGetAfterDeleteOneHook = { deleteWebhook { uuid = hook1.uuid } List
evts = [] envSpec.simulator(WEBHOOK_PATH) { HttpEntity
e -> CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class) evts.add(evt) return [:] } String content = "hello world" bean(EventFacade.class).fire(path, content) retryInSecs { assert evts.size() == 1 } } def testNoEventGetAfterDeleteAllHooks = { deleteWebhook { uuid = hook2.uuid } List
evts = [] envSpec.simulator(WEBHOOK_PATH) { HttpEntity
e -> CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class) evts.add(evt) return [:] } String content = "hello world" bean(EventFacade.class).fire(path, content) retryInSecs { assert evts.size() == 0 } } testFireTwoEvents() testOneEventsGetAfterDeleteOneHook() testNoEventGetAfterDeleteAllHooks() } @Override void environment() { envSpec = env { // nothing } } @Override void test() { envSpec.create { testCanonicalEventUseWebhook() testCanonicalEventWithVariableInPath() testErrorToCreateWebhookifOpaqueFieldMissing() } }}

class WebhookCase extends SubCase {    EnvSpec envSpec    @Override    void clean() {        envSpec.delete()    }    @Override    void setup() {        INCLUDE_CORE_SERVICES = false        spring {            include("webhook.xml")        }    }    @Override    void environment() {        envSpec = env {            // nothing        }    }    void testWebhooksCRUD() {        WebhookInventory hook = null        def testCreateWebhook = {            def params = null            hook = createWebhook {                name = "webhook"                type = "custom-type"                url = "http://127.0.0.1:8080/webhook"                description = "desc"                opaque = "test data"                params = delegate            }            assert dbIsExists(hook.uuid, WebhookVO.class)            assert hook.name == params.name            assert hook.type == params.type            assert hook.url == params.url            assert hook.description == params.description            assert hook.opaque == params.opaque        }        def testQueryWebhook = {            List
invs = queryWebhook { conditions = ["name=${hook.name}"] } assert invs.size() == 1 assert invs[0].uuid == hook.uuid } def testDeleteWebhook = { deleteWebhook { uuid = hook.uuid } assert !dbIsExists(hook.uuid, WebhookVO.class) } testCreateWebhook() testQueryWebhook() testDeleteWebhook() } void testInvalidUrl() { expect(AssertionError.class) { createWebhook { name = "webhook" type = "custom-type" url = "this is not a url" description = "desc" opaque = "test data" } } } @Override void test() { envSpec.create { testWebhooksCRUD() testInvalidUrl() } }}

CloudBus

CloudBus可以说是ZStack中最重要的组件了,ZStack各个模块的通信全部是由Message来完成的,而CloudBus就是它们的通信媒介,接下来我们来看它的源码。

本节适合对AMQP有一定了解同学,如果不了解可以先看我的博客

如何使用它

package org.zstack.test.core.cloudbus;import junit.framework.Assert;import org.junit.Before;import org.junit.Test;import org.zstack.core.cloudbus.CloudBusIN;import org.zstack.core.componentloader.ComponentLoader;import org.zstack.header.AbstractService;import org.zstack.header.Service;import org.zstack.header.message.Message;import org.zstack.header.message.MessageReply;import org.zstack.header.message.NeedReplyMessage;import org.zstack.test.BeanConstructor;import org.zstack.utils.Utils;import org.zstack.utils.logging.CLogger;import java.util.concurrent.TimeUnit;public class TestCloudBusCall {    CLogger logger = Utils.getLogger(TestCloudBusCall.class);    ComponentLoader loader;    CloudBusIN bus;    Service serv;    public static class HelloWorldMsg extends NeedReplyMessage {        private String greet;        public String getGreet() {            return greet;        }        public void setGreet(String greet) {            this.greet = greet;        }    }    public static class HelloWorldReply extends MessageReply {        private String greet;        public String getGreet() {            return greet;        }        public void setGreet(String greet) {            this.greet = greet;        }    }    class FakeService extends AbstractService {        @Override        public boolean start() {            bus.registerService(this);            bus.activeService(this);            return true;        }        @Override        public boolean stop() {            bus.deActiveService(this);            bus.unregisterService(this);            return true;        }        @Override        public void handleMessage(Message msg) {            if (msg.getClass() == HelloWorldMsg.class) {                HelloWorldMsg hmsg = (HelloWorldMsg) msg;                HelloWorldReply r = new HelloWorldReply();                r.setGreet(hmsg.getGreet());                bus.reply(msg, r);            }        }        @Override        public String getId() {            return this.getClass().getCanonicalName();        }    }    @Before    public void setUp() throws Exception {        BeanConstructor con = new BeanConstructor();        loader = con.build();        bus = loader.getComponent(CloudBusIN.class);        serv = new FakeService();        serv.start();    }    @Test    public void test() throws InterruptedException, ClassNotFoundException {        HelloWorldMsg msg = new HelloWorldMsg();        msg.setGreet("Hello");        msg.setServiceId(FakeService.class.getCanonicalName());        msg.setTimeout(TimeUnit.SECONDS.toMillis(10));        HelloWorldReply r = (HelloWorldReply) bus.call(msg);        serv.stop();        Assert.assertEquals("Hello", r.getGreet());    }}我们注册了一个Service,并覆写HandleMessage方法,在Case中,我们成功收到了消息并通过了断言。再看一个:

package org.zstack.test.core.cloudbus;

import junit.framework.Assert;

import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

public class TestCloudBusSendCallback {

CLogger logger = Utils.getLogger(TestCloudBusSendCallback.class);ComponentLoader loader;CloudBusIN bus;CountDownLatch latch = new CountDownLatch(1);boolean isSuccess = false;Service serv;public static class HelloWorldMsg extends NeedReplyMessage {    private String greet;    public String getGreet() {        return greet;    }    public void setGreet(String greet) {        this.greet = greet;    }}public static class HelloWorldReply extends MessageReply {    private String greet;    public String getGreet() {        return greet;    }    public void setGreet(String greet) {        this.greet = greet;    }}class FakeService extends AbstractService {    @Override    public boolean start() {        bus.registerService(this);        bus.activeService(this);        return true;    }    @Override    public boolean stop() {        bus.deActiveService(this);        bus.unregisterService(this);        return true;    }    @Override    public void handleMessage(Message msg) {        if (msg.getClass() == HelloWorldMsg.class) {            HelloWorldMsg hmsg = (HelloWorldMsg) msg;            HelloWorldReply r = new HelloWorldReply();            r.setGreet(hmsg.getGreet());            bus.reply(msg, r);        }    }    @Override    public String getId() {        return this.getClass().getCanonicalName();    }}@Beforepublic void setUp() throws Exception {    BeanConstructor con = new BeanConstructor();    loader = con.build();    bus = loader.getComponent(CloudBusIN.class);    serv = new FakeService();    serv.start();}@Testpublic void test() throws InterruptedException, ClassNotFoundException {    HelloWorldMsg msg = new HelloWorldMsg();    msg.setGreet("Hello");    msg.setServiceId(FakeService.class.getCanonicalName());    msg.setTimeout(TimeUnit.SECONDS.toMillis(10));    bus.send(msg, new CloudBusCallBack(null) {        @Override        public void run(MessageReply reply) {            if (reply instanceof HelloWorldReply) {                HelloWorldReply hr = (HelloWorldReply) reply;                if ("Hello".equals(hr.getGreet())) {                    isSuccess = true;                }            }            latch.countDown();        }    });    latch.await(15, TimeUnit.SECONDS);    serv.stop();    Assert.assertEquals(true, isSuccess);}

}

同样也是注册了一个Service,然后使用了CallBack,如果运行一下发现断言是可以通过的——意味着CallBack有被调用。综上,使用CloudBus很简单——只需要注册你的Service,使用CloudBus指定Service发送,Service就能收到,如果你需要注册你的CallBack,也能很简单完成。

接口定义

这么好用的东西,内部实现恐怕不会太简单。我们先从开始看:

package org.zstack.core.cloudbus;import org.zstack.header.Component;import org.zstack.header.Service;import org.zstack.header.errorcode.ErrorCode;import org.zstack.header.exception.CloudConfigureFailException;import org.zstack.header.message.*;import java.util.List;public interface CloudBus extends Component {    void send(Message msg);        
void send(List
msgs); void send(NeedReplyMessage msg, CloudBusCallBack callback); @Deprecated void send(List
msgs, CloudBusListCallBack callBack); @Deprecated void send(List
msgs, int parallelLevel, CloudBusListCallBack callBack); @Deprecated void send(List
msgs, int parallelLevel, CloudBusSteppingCallback callback); void route(List
msgs); void route(Message msg); void reply(Message request, MessageReply reply); void publish(List
events); void publish(Event event); MessageReply call(NeedReplyMessage msg);
List
call(List
msg); void registerService(Service serv) throws CloudConfigureFailException; void unregisterService(Service serv); EventSubscriberReceipt subscribeEvent(CloudBusEventListener listener, Event...events); void dealWithUnknownMessage(Message msg); void replyErrorByMessageType(Message msg, Exception e); void replyErrorByMessageType(Message msg, String err); void replyErrorByMessageType(Message msg, ErrorCode err); void logExceptionWithMessageDump(Message msg, Throwable e); String makeLocalServiceId(String serviceId); void makeLocalServiceId(Message msg, String serviceId); String makeServiceIdByManagementNodeId(String serviceId, String managementNodeId); void makeServiceIdByManagementNodeId(Message msg, String serviceId, String managementNodeId); String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid); void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid); void installBeforeDeliveryMessageInterceptor(BeforeDeliveryMessageInterceptor interceptor, Class
...classes); void installBeforeSendMessageInterceptor(BeforeSendMessageInterceptor interceptor, Class
...classes); void installBeforePublishEventInterceptor(BeforePublishEventInterceptor interceptor, Class
...classes);}

接口的命名语义较为清晰,在这里不多做解释。开始我们的源码阅读之旅。

源码解读

CloudBus在ZStack Starting的时候做了什么?

init

init是在bean处于加载器,Spring提供的一个钩子。在xml中我们可以看到声明:

init方法:

void init() {        trackerClose = CloudBusGlobalProperty.CLOSE_TRACKER;        serverIps = CloudBusGlobalProperty.SERVER_IPS;        tracker = new MessageTracker();        ConnectionFactory connFactory = new ConnectionFactory();        List
addresses = CollectionUtils.transformToList(serverIps, new Function
() { @Override public Address call(String arg) { return Address.parseAddress(arg); } }); connFactory.setAutomaticRecoveryEnabled(true); connFactory.setRequestedHeartbeat(CloudBusGlobalProperty.RABBITMQ_HEART_BEAT_TIMEOUT); connFactory.setNetworkRecoveryInterval((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_NETWORK_RECOVER_INTERVAL)); connFactory.setConnectionTimeout((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_CONNECTION_TIMEOUT)); logger.info(String.format("use RabbitMQ server IPs: %s", serverIps)); try { if (CloudBusGlobalProperty.RABBITMQ_USERNAME != null) { connFactory.setUsername(CloudBusGlobalProperty.RABBITMQ_USERNAME); logger.info(String.format("use RabbitMQ username: %s", CloudBusGlobalProperty.RABBITMQ_USERNAME)); } if (CloudBusGlobalProperty.RABBITMQ_PASSWORD != null) { connFactory.setPassword(CloudBusGlobalProperty.RABBITMQ_PASSWORD); logger.info("use RabbitMQ password: ******"); } if (CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST != null) { connFactory.setVirtualHost(CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST); logger.info(String.format("use RabbitMQ virtual host: %s", CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST)); } conn = connFactory.newConnection(addresses.toArray(new Address[]{})); logger.debug(String.format("rabbitmq connection is established on %s", conn.getAddress())); ((Recoverable)conn).addRecoveryListener(new RecoveryListener() { @Override public void handleRecovery(Recoverable recoverable) { logger.info(String.format("rabbitmq connection is recovering on %s", conn.getAddress().toString())); } }); channelPool = new ChannelPool(CloudBusGlobalProperty.CHANNEL_POOL_SIZE, conn); createExchanges(); outboundQueue = new BusQueue(makeMessageQueueName(SERVICE_ID), BusExchange.P2P); Channel chan = channelPool.acquire(); chan.queueDeclare(outboundQueue.getName(), false, false, true, queueArguments()); chan.basicConsume(outboundQueue.getName(), true, consumer); chan.queueBind(outboundQueue.getName(), outboundQueue.getBusExchange().toString(), outboundQueue.getBindingKey()); channelPool.returnChannel(chan); maid.construct(); noRouteEndPoint.construct(); tracker.construct(); tracker.trackService(SERVICE_ID); } catch (Exception e) { throw new CloudRuntimeException(e); } }

简单来说,该函数尝试获取配置文件中与RabbitMQ中相关的配置,并初始化Connection,并以此为基础创建了channel poll。然后将一个channel和一个messageQueue绑定在了一起。同时构造了EventMaid和noRouteEndPoint和tracker,后二者都是Message的消费者,看名字就可以看出来,一个用于订阅/发布模型(绑定此交换器的队列都会收到消息),一个用于track。

start

start则是ZStack定义的一个钩子,当ManagerNode起来的时候,start会被调用到。

@Override    public boolean start() {        populateExtension();        prepareStatistics();        for (Service serv : services) {            assert serv.getId() != null : String.format("service id can not be null[%s]", serv.getClass().getName());            registerService(serv);        }        jmxf.registerBean("CloudBus", this);        return true;    }

一个个看:

private void populateExtension() {        services = pluginRgty.getExtensionList(Service.class);        for (ReplyMessagePreSendingExtensionPoint extp : pluginRgty.getExtensionList(ReplyMessagePreSendingExtensionPoint.class)) {            List
clazzs = extp.getReplyMessageClassForPreSendingExtensionPoint(); if (clazzs == null || clazzs.isEmpty()) { continue; } for (Class clz : clazzs) { if (!(APIEvent.class.isAssignableFrom(clz)) && !(MessageReply.class.isAssignableFrom(clz))) { throw new CloudRuntimeException(String.format("ReplyMessagePreSendingExtensionPoint can only marshal APIEvent or MessageReply. %s claimed by %s is neither APIEvent nor MessageReply", clz.getName(), extp.getClass().getName())); } List
exts = replyMessageMarshaller.get(clz); if (exts == null) { exts = new ArrayList
(); replyMessageMarshaller.put(clz, exts); } exts.add(extp); } } }

首先收集了所有继承于Service的类,然后加载会改变msg reply的extensionPoint。

private void prepareStatistics() {        List
needReplyMsgs = BeanUtils.scanClassByType("org.zstack", NeedReplyMessage.class); needReplyMsgs = CollectionUtils.transformToList(needReplyMsgs, new Function
() { @Override public Class call(Class arg) { return !APIMessage.class.isAssignableFrom(arg) || APISyncCallMessage.class.isAssignableFrom(arg) ? arg : null; } }); for (Class clz : needReplyMsgs) { MessageStatistic stat = new MessageStatistic(); stat.setMessageClassName(clz.getName()); statistics.put(stat.getMessageClassName(), stat); } }

为需要回复的msg设置统计信息。

之后就是把所有的Service收集起来,方便Msg的分发。

常用方法

CloudBus.makeLocalServiceId

@Override    public String makeLocalServiceId(String serviceId) {        return serviceId + "." + Platform.getManagementServerId();    }    @Override    public void makeLocalServiceId(Message msg, String serviceId) {        msg.setServiceId(makeLocalServiceId(serviceId));    }

如中所说一般,每个管理节点都会注册一堆服务队列。因此我们要按照其格式组装,这样消息才能被服务所接收。

CloudBus.makeTargetServiceIdByResourceUuid

@Override    public String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid) {        DebugUtils.Assert(serviceId!=null, "serviceId cannot be null");        DebugUtils.Assert(resourceUuid!=null, "resourceUuid cannot be null");        //得到资源所在的MN UUID        String mgmtUuid = destMaker.makeDestination(resourceUuid);        return serviceId + "." + mgmtUuid;    }    @Override    public void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid) {        String targetService = makeTargetServiceIdByResourceUuid(serviceId, resourceUuid);        msg.setServiceId(targetService);    }

在ZStack中,ManagerNode很有可能是集群部署的,每个MN管控不同的资源。那么就需要一致性哈希环来确定资源所在哪个MN。

CloudBus.send

@Override    public void send(final NeedReplyMessage msg, final CloudBusCallBack callback) {        //给msg一个超时时间        evaluateMessageTimeout(msg);        //new继承于Envelope的匿名内部类        Envelope e = new Envelope() {            //用来判断这个msg是否已经发出去了            AtomicBoolean called = new AtomicBoolean(false);            final Envelope self = this;            //计算超时,往线程池提交一个任务            TimeoutTaskReceipt timeoutTaskReceipt = thdf.submitTimeoutTask(new Runnable() {                @Override                public void run() {                    self.timeout();                }            }, TimeUnit.MILLISECONDS, msg.getTimeout());            @Override            //msg 发送成功时候调用这个方法            public void ack(MessageReply reply) {                //计算该msg耗时                count(msg);                //根据msg的唯一UUID移除在这个map中的记录                envelopes.remove(msg.getId());                //如果更新失败,说明这个消息已经被发送过了。返回                if (!called.compareAndSet(false, true)) {                    return;                }                //取消一个计算超时的任务                timeoutTaskReceipt.cancel();                //调用注册的callback                callback.run(reply);            }            //消息超时时调用的逻辑            @Override            public void timeout() {                // 根据msg的唯一UUID移除在这个map中的记录                envelopes.remove(msg.getId());                 // 如何已经被调用过则返回                if (!called.compareAndSet(false, true)) {                    return;                }                //内部构造一个超时reply返回给callback                callback.run(createTimeoutReply(msg));            }            //用于getWaitingReplyMessageStatistic            @Override            List
getRequests() { List
requests = new ArrayList
(); requests.add(msg); return requests; } }; //往envelopes这个map里放入msg的唯一UUID和刚刚构造的envelope envelopes.put(msg.getId(), e); //发送消息 send(msg, false); }

私有方法:send

private void send(Message msg, Boolean noNeedReply) {        //msg的serviceID不允许为空,不然不能        if (msg.getServiceId() == null) {            throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName()));        }        //为msg构建基本属性        basicProperty(msg);        //设置msg header属性        msg.putHeaderEntry(CORRELATION_ID, msg.getId());        //消息的回复队列设置        msg.putHeaderEntry(REPLY_TO, outboundQueue.getBindingKey());        if (msg instanceof APIMessage) {            // API always need reply            msg.putHeaderEntry(NO_NEED_REPLY_MSG, Boolean.FALSE.toString());        } else if (msg instanceof NeedReplyMessage) {            // for NeedReplyMessage sent without requiring receiver to reply,            // mark it, then it will not be tracked and replied            msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString());        }        buildRequestMessageMetaData(msg);        wire.send(msg);    }

该函数是一段公用逻辑。所有的消息都是从这里进去然后由rabbitMQ发出去的。所以在这里需要多说几句。

protected void basicProperty(Message msg) {        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();        msg.setAMQPProperties(builder.deliveryMode(1).expiration(String.valueOf(TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.MESSAGE_TTL))).build());    }

这个函数设置了msg基础属性——持久化策略(否)和超时。

那么再看buildRequestMessageMetaData方法

private void buildRequestMessageMetaData(Message msg) {        if (msg instanceof APIMessage || (msg instanceof NeedReplyMessage && !Boolean.valueOf((String)msg.getHeaderEntry(NO_NEED_REPLY_MSG)))) {            RequestMessageMetaData metaData;            if (msg instanceof LockResourceMessage) {                LockResourceMessage lmsg = (LockResourceMessage) msg;                LockMessageMetaData lmetaData = new LockMessageMetaData();                lmetaData.unlockKey = lmsg.getUnlockKey();                lmetaData.reason = lmsg.getReason();                lmetaData.senderManagementUuid = Platform.getManagementServerId();                metaData = lmetaData;            } else {                metaData = new RequestMessageMetaData();            }            metaData.needApiEvent = msg instanceof APIMessage && !(msg instanceof APISyncCallMessage);            metaData.msgId = msg.getId();            metaData.replyTo = msg.getHeaderEntry(REPLY_TO);            metaData.timeout = msg instanceof NeedReplyMessage ? ((NeedReplyMessage) msg).getTimeout() : null;            metaData.serviceId = msg.getServiceId();            metaData.messageName = msg.getClass().getName();            metaData.className = metaData.getClass().getName();            msg.getAMQPHeaders().put(MESSAGE_META_DATA, JSONObjectUtil.toJsonString(metaData));        }    }

方法buildRequestMessageMetaData将消息所需的metaData从msg里取了出来并放入了msg的真正Header中。

然后是wire.send:

public void send(Message msg) {            // for unit test finding invocation chain            MessageCommandRecorder.record(msg.getClass());            List
interceptors = beforeSendMessageInterceptors.get(msg.getClass()); if (interceptors != null) { for (BeforeSendMessageInterceptor interceptor : interceptors) { interceptor.intercept(msg); /* if (logger.isTraceEnabled()) { logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass())); } */ } } for (BeforeSendMessageInterceptor interceptor : beforeSendMessageInterceptorsForAll) { interceptor.intercept(msg); /* if (logger.isTraceEnabled()) { logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass())); } */ } send(msg, true); }

逻辑一目了然:

  1. 记录它的调用链
  2. 调用它专属的发送前拦截器进行拦截
  3. 调用所有msg的发送前拦截器进行拦截

send(msg, true);:

public void send(final Message msg, boolean makeQueueName) {            /*            StopWatch watch = new StopWatch();            watch.start();            */            String serviceId = msg.getServiceId();            if (makeQueueName) {                 //获取真正的队列名                serviceId = makeMessageQueueName(serviceId);            }            // build json schema            buildSchema(msg);            //当前的thread Context中获取必要信息。每个api调用所携带的uuid就是这样传递下去的            evalThreadContextToMessage(msg);            if (logger.isTraceEnabled() && logMessage(msg)) {                logger.trace(String.format("[msg send]: %s", wire.dumpMessage(msg)));            }            //从channel poll 中取出一个channel             Channel chan = channelPool.acquire();            try {                //接下来单独解释                new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();                /*                watch.stop();                logger.debug(String.mediaType("sending %s cost %sms", msg.getClass().getName(), watch.getTime()));                */            } catch (IOException e) {                throw new CloudRuntimeException(e);            } finally {                //返回给channel poll                channelPool.returnChannel(chan);            }        }

单独分析 new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();

private class RecoverableSend {            Channel chan;            byte[] data;            String serviceId;            Message msg;            BusExchange exchange;            RecoverableSend(Channel chan, Message msg, String serviceId, BusExchange exchange) throws IOException {                data = compressMessageIfNeeded(msg);                this.chan = chan;                this.serviceId = serviceId;                this.msg = msg;                this.exchange = exchange;            }            void send() throws IOException {                try {                    chan.basicPublish(exchange.toString(), serviceId,                            true, msg.getAMQPProperties(), data);                } catch (ShutdownSignalException e) {                    if (!(conn instanceof AutorecoveringConnection) || serverIps.size() <= 1 || !Platform.IS_RUNNING) {                        // the connection is not recoverable                        throw e;                    }                    logger.warn(String.format("failed to send a message because %s; as the connection is recoverable," +                            "we are doing recoverable send right now", e.getMessage()));                    if (!recoverSend()) {                        throw e;                    }                }            }            private byte[] compressMessageIfNeeded(Message msg) throws IOException {                if (!CloudBusGlobalProperty.COMPRESS_NON_API_MESSAGE || msg instanceof APIEvent || msg instanceof APIMessage) {                    return gson.toJson(msg, Message.class).getBytes();                }                msg.getAMQPHeaders().put(AMQP_PROPERTY_HEADER__COMPRESSED, "true");                return Compresser.deflate(gson.toJson(msg, Message.class).getBytes());            }            private boolean recoverSend() throws IOException {                int interval = conn.getHeartbeat() / 2;                interval = interval > 0 ? interval : 1;                int count = 0;                // as the connection is lost, there is no need to wait heart beat missing 8 times                // so we use reflection to fast the process                RecoveryAwareAMQConnection delegate = FieldUtils.getFieldValue("delegate", conn);                DebugUtils.Assert(delegate != null, "cannot get RecoveryAwareAMQConnection");                Field _missedHeartbeats = FieldUtils.getField("_missedHeartbeats", RecoveryAwareAMQConnection.class);                DebugUtils.Assert(_missedHeartbeats!=null, "cannot find _missedHeartbeats");                _missedHeartbeats.setAccessible(true);                try {                    _missedHeartbeats.set(delegate, 100);                } catch (IllegalAccessException e) {                    throw new CloudRuntimeException(e);                }                while (count < CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES) {                    try {                        TimeUnit.SECONDS.sleep(interval);                    } catch (InterruptedException e1) {                        logger.warn(e1.getMessage());                    }                    try {                        chan.basicPublish(exchange.toString(), serviceId,                                true, msg.getAMQPProperties(), data);                        return true;                    } catch (ShutdownSignalException e) {                        logger.warn(String.format("recoverable send fails %s times, will continue to retry %s times; %s",                                count, CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES-count, e.getMessage()));                        count ++;                    }                }                return false;            }        }

最核心的代码即是:

chan.basicPublish(exchange.toString(), serviceId,                            true, msg.getAMQPProperties(), data);

根据交换器、绑定器的key和msg的基本属性还有已经序列化的msg在RabbitMQ中发送消息。

我们可以看一下该方法签名:

/**     * Publish a message     * @see com.rabbitmq.client.AMQP.Basic.Publish     * @param exchange the exchange to publish the message to     * @param routingKey the routing key     * @param mandatory true if the 'mandatory' flag is to be set     * @param props other properties for the message - routing headers etc     * @param body the message body     * @throws java.io.IOException if an error is encountered     */    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)            throws IOException;

当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。

还有一个附有immediate的方法:

/**     * Publish a message     * @see com.rabbitmq.client.AMQP.Basic.Publish     * @param exchange the exchange to publish the message to     * @param routingKey the routing key     * @param mandatory true if the 'mandatory' flag is to be set     * @param immediate true if the 'immediate' flag is to be     * set. Note that the RabbitMQ server does not support this flag.     * @param props other properties for the message - routing headers etc     * @param body the message body     * @throws java.io.IOException if an error is encountered     */    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)            throws IOException;

当immediate标志位设置为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

CloudBus.reply

@Override    public void reply(Message request, MessageReply reply) {        if (Boolean.valueOf((String) request.getHeaderEntry(NO_NEED_REPLY_MSG))) {            if (logger.isTraceEnabled()) {                logger.trace(String.format("%s in message%s is set, drop reply%s", NO_NEED_REPLY_MSG,                        wire.dumpMessage(request), wire.dumpMessage(reply)));            }            return;        }        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();        reply.setAMQPProperties(builder.deliveryMode(1).build());        reply.getHeaders().put(IS_MESSAGE_REPLY, Boolean.TRUE.toString());        reply.putHeaderEntry(CORRELATION_ID, request.getId());        reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));        buildResponseMessageMetaData(reply);        if (request instanceof NeedReplyMessage) {            callReplyPreSendingExtensions(reply, (NeedReplyMessage) request);        }        wire.send(reply, false);    }

其他属性之前都有提到。 reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));则是将reply统一经过outboundQueue这个队列,同时根据correlationId返回给原发送者。

callReplyPreSendingExtensions则会根据需求改变reply结果。之后就是wire.send,之前已经分析过了。

CloudBus.publish

@Override    public void publish(Event event) {        if (event instanceof APIEvent) {            APIEvent aevt = (APIEvent) event;            DebugUtils.Assert(aevt.getApiId() != null, String.format("apiId of %s cannot be null", aevt.getClass().getName()));        }        //和前面的msgProperty一样        eventProperty(event);        //构建metaData        buildResponseMessageMetaData(event);        //前面分析过了        callReplyPreSendingExtensions(event, null);        //调用beforeEventPublishInterceptors。为了抛出异常的时候方便track,声明了这样的一个变量。        BeforePublishEventInterceptor c = null;        try {            List
is = beforeEventPublishInterceptors.get(event.getClass()); if (is != null) { for (BeforePublishEventInterceptor i : is) { c = i; i.beforePublishEvent(event); } } for (BeforePublishEventInterceptor i : beforeEventPublishInterceptorsForAll) { c = i; i.beforePublishEvent(event); } } catch (StopRoutingException e) { if (logger.isTraceEnabled()) { logger.trace(String.format("BeforePublishEventInterceptor[%s] stop publishing event: %s", c == null ? "null" : c.getClass().getName(), JSONObjectUtil.toJsonString(event))); } return; } wire.publish(event); }

接下来看wire.publish方法

public void publish(Event evt) {            /*            StopWatch watch = new StopWatch();            watch.start();            */            buildSchema(evt);            evalThreadContextToMessage(evt);            if (logger.isTraceEnabled() && logMessage(evt)) {                logger.trace(String.format("[event publish]: %s", wire.dumpMessage(evt)));            }            Channel chan = channelPool.acquire();            try {                new RecoverableSend(chan, evt, evt.getType().toString(), BusExchange.BROADCAST).send();                /*                watch.stop();                logger.debug(String.mediaType("sending %s cost %sms", evt.getClass().getName(), watch.getTime()));                */            } catch (IOException e) {                throw new CloudRuntimeException(e);            } finally {                channelPool.returnChannel(chan);            }        }

大部分方法和send无异。但是在Event的类中定义了两种Type:

package org.zstack.header.message;import org.zstack.header.rest.APINoSee;public abstract class Event extends Message {    /**     * @ignore     */    @APINoSee    private String avoidKey;    public String getAvoidKey() {        return avoidKey;    }    public void setAvoidKey(String avoidKey) {        this.avoidKey = avoidKey;    }    public abstract Type getType();    public abstract String getSubCategory();    public static final String BINDING_KEY_PERFIX = "key.event.";    public static enum Category {        LOCAL,        API,    }    public static class Type {        private final String _name;        public Type(Category ctg, String subCtg) {            _name = BINDING_KEY_PERFIX + ctg.toString() + "." + subCtg;        }        @Override        public String toString() {            return _name;        }        @Override        public int hashCode() {            return _name.hashCode();        }        @Override        public boolean equals(Object t) {            if (!(t instanceof Type)) {                return false;            }            Type type = (Type) t;            return _name.equals(type.toString());        }    }}

即Local和API。从名字上很好看出来,一个用来回复APIMsg的,一个用来发布本地消息。不过要了解这里面的细节,就得看EventMaid了。

EventMaid
private class EventMaid extends AbstractConsumer {        Map
> listeners = new ConcurrentHashMap
>(); Channel eventChan; String queueName = makeEventQueueName(String.format("eventMaid.%s", Platform.getUuid())); public void construct() { try { eventChan = conn.createChannel(); eventChan.queueDeclare(queueName, false, false, true, queueArguments()); eventChan.basicConsume(queueName, true, this); } catch (IOException e) { throw new CloudRuntimeException(e); } } public void destruct() { try { eventChan.close(); } catch (IOException e) { throw new CloudRuntimeException(e); } } public void listen(Event evt, EventListenerWrapper l) { String type = evt.getType().toString(); try { synchronized (listeners) { List
lst = listeners.get(type); if (lst == null) { lst = new CopyOnWriteArrayList
(); listeners.put(type, lst); eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type); logger.debug(String.format("[listening event]: %s", type)); } if (!lst.contains(l)) { lst.add(l); } } } catch (IOException e) { throw new CloudRuntimeException(e); } } public void unlisten(Event evt, EventListenerWrapper l) { String type = evt.getType().toString(); try { synchronized (listeners) { List
lst = listeners.get(type); if (lst == null) { return; } lst.remove(l); if (lst.isEmpty()) { listeners.remove(type); eventChan.queueUnbind(queueName, BusExchange.BROADCAST.toString(), type); logger.debug(String.format("[unlistening event]: %s", type)); } } } catch (IOException e) { throw new CloudRuntimeException(e); } } @SyncThread(level = 10) @MessageSafe private void dispatch(Event evt, EventListenerWrapper l) { setThreadLoggingContext(evt); l.callEventListener(evt); } private void handle(Event evt) { String type = evt.getType().toString(); List
lst = listeners.get(type); if (lst == null) { return; } if (logger.isTraceEnabled()) { logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt))); } for (EventListenerWrapper l : lst) { dispatch(evt, l); } } @Override public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { Event evt = null; try { evt = (Event) wire.toMessage(bytes, basicProperties); handle(evt); } catch (final Throwable t) { final Event fevt = evt; throwableSafe(new Runnable() { @Override public void run() { if (fevt != null) { logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t); } else { logger.warn(String.format("unhandled throwable"), t); } } }); } } }

这段代码得先从handleDelivery开始看:

@Override        public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {            Event evt = null;            try {                evt = (Event) wire.toMessage(bytes, basicProperties);                handle(evt);            } catch (final Throwable t) {                final Event fevt = evt;                throwableSafe(new Runnable() {                    @Override                    public void run() {                        if (fevt != null) {                            logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t);                        } else {                            logger.warn(String.format("unhandled throwable"), t);                        }                    }                });            }        }

可以看到,这里是重载了Consumer接口的handleDelivery,我们看一下它的方法注释:

/**     * Called when a basic.deliver is received for this consumer.     * @param consumerTag the consumer tag associated with the consumer     * @param envelope packaging data for the message     * @param properties content header data for the message     * @param body the message body (opaque, client-specific byte array)     * @throws IOException if the consumer encounters an I/O error while processing the message     * @see Envelope     */    void handleDelivery(String consumerTag,                        Envelope envelope,                        AMQP.BasicProperties properties,                        byte[] body)        throws IOException;

这样保证EventMaid的对象能够接收到Msg。在try代码块中,从byte转换出了Event,然后走向了handle逻辑。

private void handle(Event evt) {            //前面提过,有两种Type,API和Local            String type = evt.getType().toString();            //所以只会取出两种List            List
lst = listeners.get(type); if (lst == null) { return; } if (logger.isTraceEnabled()) { logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt))); } for (EventListenerWrapper l : lst) { //跳到下一个逻辑 dispatch(evt, l); } }
@SyncThread(level = 10)        @MessageSafe        private void dispatch(Event evt, EventListenerWrapper l) {            setThreadLoggingContext(evt);            //跳至下一段逻辑            l.callEventListener(evt);        }
@Override    public EventSubscriberReceipt subscribeEvent(final CloudBusEventListener listener, final Event... events) {        final EventListenerWrapper wrapper = new EventListenerWrapper() {            @Override            public void callEventListener(Event e) {                //走到各自的handle逻辑,如果返回true则unlisten                if (listener.handleEvent(e)) {                    maid.unlisten(e, this);                }            }        };        // 一个event对应一个ListenWrapper        for (Event e : events) {            maid.listen(e, wrapper);        }        return new EventSubscriberReceipt() {            @Override            public void unsubscribe(Event e) {                maid.unlisten(e, wrapper);            }            @Override            public void unsubscribeAll() {                for (Event e : events) {                    maid.unlisten(e, wrapper);                }            }        };    }

再看listen:

public void listen(Event evt, EventListenerWrapper l) {            String type = evt.getType().toString();            try {                synchronized (listeners) {                    List
lst = listeners.get(type); if (lst == null) { lst = new CopyOnWriteArrayList
(); listeners.put(type, lst); eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type); logger.debug(String.format("[listening event]: %s", type)); } if (!lst.contains(l)) { lst.add(l); } } } catch (IOException e) { throw new CloudRuntimeException(e); } }

首先加锁了listeners这个put,并根据type取出相应的list。同时将这个list转换为CopyOnWriteArrayList,这样这个list的引用就不会泄露出去了。然后绑定一个channel作为通道。另外,如果EventListenerWrapper List中不存在提交的EventListenerWrapper,则添加进去。

相信讲了这么多,有一部分读者可能已经绕晕了。这边写一个关于EventMaid的逻辑调用小结:

  • 在ZStack的每个Component启动时,会向CloudBus订阅event。
  • 当CloudBus收到需要publish的event,会向所有实现CloudBusEventListener接口的对象发送事件,由他们自己选择是否处理这些事件。
CloudBus和EventFascade就是这样协同工作的。

小结

在本文,我们一起浏览的ZStack中提供消息驱动特性组件的源码——显然,这两个组件的API非常好用,简洁明了。但在具体逻辑中有几个可以改进的点:

  • handleEvent返回boolean的判断为ture则取消listen,语义上不是很好理解
  • listen方法中的listeners可以用并发容器——ConcurrentHashMap代替,以增加吞吐量。
  • listeners的v完全可以用Set来代替。CopyOnWriteArrayList也可以用CopyOnWriteArraySet来代替。我们在listen方法中可以看到,如果lst不包含l,则add。这说明lst是不应该重复的。

转载地址:http://yxfgx.baihongyu.com/

你可能感兴趣的文章
Windows 系统常见操作
查看>>
走马观花: Linux 系统调用 open 七日游(一)
查看>>
请问怎样把 Real Player11录制的ivr格式音频转化为其他格式呢?
查看>>
crontab计划任务无法执行(小结)
查看>>
puppet三种认证注册方式详解及常见报错分析
查看>>
Field 'ssl_cipher' doesn't have a default value
查看>>
HDFS集中式缓存管理
查看>>
36.moquette源代码编译和运行
查看>>
Packet Tracer实验
查看>>
Java笔记9:构造方法,内部类
查看>>
PG PLProxy配置说明
查看>>
LVS 三种模式区别
查看>>
linux下一键编译安装MariaDB10.0.12
查看>>
Windows服务中Timer组件
查看>>
通过 SQL Server 2005 索引视图提高性能
查看>>
项目流程
查看>>
coreseek
查看>>
[SAP HANA]Cannot create Delivery Unit as content vendor is not defined for this system
查看>>
Single Number and Single Number II
查看>>
[Java Web] 6、Tomcat服务器的安装及配置以及JSP技术笔记
查看>>