Adhesive框架系列文章--Mongodb数据服务模块实

系统 1523 0

Mongodb数据服务可以直接接受任何类型数据,并且它设计为可以承受大量数据的写入。为了能保存任何类型的数据,并且在后台可以查看任何类型的数据,因此我们必须在收到数据的时候对数据的元数据进行提取,随同主体数据一并保存在数据库中。对数据本身也需要重新组织结构,相当于进行一次序列化,然后保存到数据库中。虽然Mongodb是支持Json格式的,但是由于我们在保存数据的时候还有很多逻辑,因此我们必须手动进行这个工作。其实对于提交数据来说,应该是一个非常快的动作,应该以异步方式进行,在一个尽量短的时间内让方法的调用可以返回,之后可以在后台慢慢进行数据的转换和数据发送到远端。因此,开发了一个内存队列服务模块来进行异步队列处理工作,并且提交数据到远端也使用了框架内部的Wcf分布式服务模块。当然,在服务端道理也一样,我们可以通过一个内存队列来批量提交数据,并且让服务的调用尽快返回。Mongodb数据服务提交数据的过程如下:

image

项目的结构如下:

image

1、Mongodb项目是客户端部分的接口

2、Mongodb.Imp项目是客户端部分的实现

3、Mongodb.Server是服务端部分的接口,或者说是服务契约

4、Mongodb.Server.Imp是服务端部分的实现

可以看到Mongodb数据本身依赖应用程序信息中心模块、配置服务模块、内存队列服务模块、Wcf分布式服务模块,对于大部分客户端应用程序来说都应该只依赖Mongodb数据服务的客户端而不是服务端。我们把Mongodb数据服务分成两部分,插入数据的服务和查询服务,后者的使用者一般而言只有Mongodb数据服务的后台。本文主要介绍前者:

    
      public
    
    
      interface
    
     IMongodbInsertService : IDisposable

    {

        
    
      void
    
     Insert(
    
      object
    
     item);

    }
  

从接口本身来看非常简单,只有一个方法。我们来看看它的实现步骤:

1、调用配置服务,查看这个数据类型对应的配置,说到这里,让我们来看一下Mongodb数据服务客户端的配置:

        [ConfigEntity(FriendlyName = 
    
      "Mongodb客户端配置"
    
    )]

    
    
      public
    
    
      class
    
     MongodbServiceConfigurationEntity

    {

        [ConfigItem(FriendlyName = 
    
      "插入服务配置项列表"
    
    )]

        
    
      public
    
     Dictionary<
    
      string
    
    , MongodbInsertServiceConfigurationItem> MongodbInsertServiceConfigurationItems { get; set; }

    }
  

每一个类型的配置项如下:

        [ConfigEntity(FriendlyName = 
    
      "Mongodb客户端针对每个数据类型的配置"
    
    )]

    
    
      public
    
    
      class
    
     MongodbInsertServiceConfigurationItem

    {

        [ConfigItem(FriendlyName = 
    
      "类型完整名"
    
    )]

        
    
      public
    
    
      string
    
     TypeFullName { get; set; }



        [ConfigItem(FriendlyName = 
    
      "是否提交到服务端"
    
    )]

        
    
      public
    
    
      bool
    
     SubmitToServer { get; set; }



        [ConfigItem(FriendlyName = 
    
      "队列最大项数"
    
    )]

        
    
      public
    
    
      int
    
     MaxItemCount { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费的线程总数"
    
    )]

        
    
      public
    
    
      int
    
     ConsumeThreadCount { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费数据的时间间隔毫秒"
    
    )]

        
    
      public
    
    
      int
    
     ConsumeIntervalMilliseconds { get; set; }



        [ConfigItem(FriendlyName = 
    
      "遇到错误时消费数据的时间间隔毫秒"
    
    )]

        
    
      public
    
    
      int
    
     ConsumeIntervalWhenErrorMilliseconds { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费数据的批量项数"
    
    )]

        
    
      public
    
    
      int
    
     ConsumeItemCountInOneBatch { get; set; }



        [ConfigItem(FriendlyName = 
    
      "达到最大项数后的策略"
    
    )]

        
    
      public
    
     MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费数据时不足批次数的策略"
    
    )]

        
    
      public
    
     MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费数据遇到错误的策略"
    
    )]

        
    
      public
    
     MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; }



        
    
      public
    
     MongodbInsertServiceConfigurationItem()

        {

            TypeFullName = 
    
      ""
    
    ;

            SubmitToServer = 
    
      true
    
    ;

            ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems

                .Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond);

            ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException;

            ConsumeThreadCount = 1;

            ConsumeIntervalMilliseconds = 10;

            ConsumeIntervalWhenErrorMilliseconds = 1000;

            ConsumeItemCountInOneBatch = 100;

            NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems;

            MaxItemCount = 10000;

        }

    }
  

这里可以看到,除了是否提交到服务端这个配置,大多数的配置其实是内存队列服务的配置,在之后的文章中我们单独会介绍内存队列服务。之所以需要为Mongodb数据服务的客户端设置这样的配置,一方面是允许修改队列服务的配置,另一方面是为了限制没有经过配置随便什么数据都往服务端发送,只有在后台显式配置的数据类型,才会发生到服务端。

2、如果没获取到配置的话返回,如果获取到配置的话,则为这个类型初始化内存队列服务,设置一系列队列服务的参数,并且把队列的处理委托挂载我们提交数据到服务端的处理方法。换句话说是每一个类型都会有自己的内存队列服务,我们在MongodbInsertService的实现定义了一个静态字典用于保存内存队列服务的实现:

    
      private
    
    
      static
    
     Dictionary<
    
      string
    
    , IMemoryQueueService> submitDataMemoryQueueServices = 
    
      new
    
     Dictionary<
    
      string
    
    , IMemoryQueueService>();
  
    
      if
    
     (!submitDataMemoryQueueServices.ContainsKey(typeFullName))

                {

                    
    
      lock
    
     (submitDataMemoryQueueServices)

                    {

                        
    
      if
    
     (!submitDataMemoryQueueServices.ContainsKey(typeFullName))

                        {

                            var memoryQueueService = LocalServiceLocator.GetService<IMemoryQueueService>();

                            memoryQueueService.Init(
    
      new
    
     MemoryQueueServiceConfiguration(
    
      string
    
    .Format(
    
      "{0}_{1}"
    
    , ServiceName, typeFullName), InternalSubmitData)

                            {

                                ConsumeErrorAction = config.ConsumeErrorAction,

                                ConsumeIntervalMilliseconds = config.ConsumeIntervalMilliseconds,

                                ConsumeIntervalWhenErrorMilliseconds = config.ConsumeIntervalWhenErrorMilliseconds,

                                ConsumeItemCountInOneBatch = config.ConsumeItemCountInOneBatch,

                                ConsumeThreadCount = config.ConsumeThreadCount,

                                MaxItemCount = config.MaxItemCount,

                                NotReachBatchCountConsumeAction = config.NotReachBatchCountConsumeAction,

                                ReachMaxItemCountAction = config.ReachMaxItemCountAction,

                            });

                            submitDataMemoryQueueServices.Add(typeFullName, memoryQueueService);

                        }

                    }

                }
  

3、然后会判断是否已经提取过这个类型元数据了,如果没提取过则尝试提取元数据并加入缓存:

    
      if
    
     (!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName))

                {

                    
    
      lock
    
     (mongodbDatabaseDescriptionCache)

                    {

                        
    
      if
    
     (!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName))

                        {

                            MongodbDatabaseDescription mongodbDatabaseDescription = GetMongodbDatabaseDescription(item);

                            CheckMongodbDatabaseDescription(mongodbDatabaseDescription);

                            mongodbDatabaseDescriptionCache.Add(typeFullName, mongodbDatabaseDescription);

                        }

                    }

                }
  

4、把数据加入队列,等待队列服务在合适的时候调用处理方法(也就是发送到服务端):

    
      if
    
     (config.SubmitToServer)

                {

                    submitDataMemoryQueueServices[typeFullName].Enqueue(item);

                }
  

 

其实到这里为止,方法已经返回了,之后就是队列服务在后台的异步调用了。现在我们来深入一下细节,首先看一下GetMongodbDatabaseDescription是如何提取元数据的,这个方法返回的是MongodbDatabaseDescription,它的定义如下:

        [DataContract(Namespace = 
    
      "Adhesive.Mongodb"
    
    )]

    
    
      public
    
    
      class
    
     MongodbDatabaseDescription

    {

        [DataMember]

        
    
      public
    
    
      bool
    
     SentToServer { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     TypeFullName { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     DatabasePrefix { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     CategoryName { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     Name { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     DisplayName { get; set; }



        [DataMember]

        
    
      public
    
    
      int
    
     ExpireDays { get; set; }



        [DataMember]

        
    
      public
    
     List<MongodbColumnDescription> MongodbColumnDescriptionList { get; set; }



        [DataMember]

        
    
      public
    
     List<MongodbEnumColumnDescription> MongodbEnumColumnDescriptionList { get; set; }

    }
  

在这里可以看到,我们主要解析的是MongodbPersistenceEntityAttribute,对于下一级的MongodbColumnDescriptionList ,我们主要是解析每一个列的元数据,而MongodbEnumColumnDescriptionList则提取所有枚举的信息。MongodbColumnDescription的定义如下:

        [DataContract(Namespace = 
    
      "Adhesive.Mongodb"
    
    )]

    
    
      public
    
    
      class
    
     MongodbColumnDescription

    {

        [DataMember]

        
    
      public
    
    
      string
    
     Name { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     TypeName { get; set; }



        [DataMember]

        
    
      public
    
    
      bool
    
     IsArrayColumn { get; set; }



        [DataMember]

        
    
      public
    
    
      bool
    
     IsEntityColumn { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     ColumnName { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     DisplayName { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     Description { get; set; }



        [DataMember]

        
    
      public
    
    
      bool
    
     ShowInTableView { get; set; }



        [DataMember]

        
    
      public
    
    
      bool
    
     IsTableColumn { get; set; }



        [DataMember]

        
    
      public
    
    
      bool
    
     IsTimeColumn { get; set; }



        [DataMember]

        
    
      public
    
    
      bool
    
     IsContextIdentityColumn { get; set; }



        [DataMember]

        
    
      public
    
    
      bool
    
     IsPrimaryKey { get; set; }



        [DataMember]

        
    
      public
    
     MongodbIndexOption MongodbIndexOption { get; set; }



        [DataMember]

        
    
      public
    
     MongodbFilterOption MongodbFilterOption { get; set; }



        [DataMember]

        
    
      public
    
     MongodbCascadeFilterOption MongodbCascadeFilterOption { get; set; }



        [DataMember]

        
    
      public
    
     MongodbSortOption MongodbSortOption { get; set; }

    }
  

这里很多数据都来自MongodbPersistenceItemAttribute和MongodbPresentationItemAttribute。再来看看MongodbEnumColumnDescription:

        [DataContract(Namespace = 
    
      "Adhesive.Mongodb"
    
    )]

    
    
      public
    
    
      class
    
     MongodbEnumColumnDescription

    {

        [DataMember]

        
    
      public
    
    
      string
    
     Name { get; set; }



        [DataMember]

        
    
      public
    
     Dictionary<
    
      string
    
    , 
    
      string
    
    > EnumItems { get; set; }



    }
  

它就简单了,只是保存枚举的列名,和枚举每一项的数据。其实这些元数据提取本身没什么复杂的,可以想到是反射提取,并且其中还涉及到递归,需要深入每一个自定义类型,GetMongodbColumnDescription方法其中有一段这样的代码实现了递归:

    
      if
    
     (!type.Assembly.GlobalAssemblyCache && type != pi.DeclaringType)

                {

                    columnDescription.IsEntityColumn = 
    
      true
    
    ;

                    var properties = GetPropertyListFromCache(type);

                    
    
      if
    
     (properties != 
    
      null
    
    )

                    {

                        
    
      foreach
    
     (var property 
    
      in
    
     properties)

                        {

                            GetMongodbColumnDescription(typeFullName, fullName, columnDescriptionList, enumColumnDescriptionList, property);

                        }

                    }

                }
  

在提取元数据的时候,另一个重要的工作是缓存一些关键的PropertyInfo的配置,以便后期处理数据的时候使用:

    
      internal
    
    
      class
    
     ProperyInfoConfig

    {

        
    
      public
    
    
      bool
    
     IsCascadeFilterLevelOne { get; set; }



        
    
      public
    
    
      bool
    
     IsCascadeFilterLevelTwo { get; set; }



        
    
      public
    
    
      bool
    
     IsCascadeFilterLevelThree { get; set; }



        
    
      public
    
    
      bool
    
     IsDateColumn { get; set; }



        
    
      public
    
    
      bool
    
     IsTableName { get; set; }



        
    
      public
    
    
      bool
    
     IsIgnore { get; set; }



        
    
      public
    
    
      string
    
     ColumnName { get; set; }

    }
  

因为我们在提交数据之前,需要针对级联下拉的数据进行处理,把第二级的值设置为第一级的值加上第二级的值,第三级的值设置为一加二加三,这样在筛选的时候就会很方便;此外还需要替换列名,计算表名等等,只有缓存了PropertyInfo才能无需重新读取元数据:

    
      private
    
    
      static
    
     Dictionary<
    
      string
    
    , Dictionary<PropertyInfo, ProperyInfoConfig>> propertyConfigCache = 
    
      new
    
     Dictionary<
    
      string
    
    , Dictionary<PropertyInfo, ProperyInfoConfig>>();
  

之前说了元数据提取部分时的逻辑,然后来看一下格式化数据时的逻辑,之前为内存队列服务的提交数据的委托挂载的方法主要实现如下:

      var mongodbDataList = items.Select(_ => ConvertItemToMongodbData(_)).Where(_ => _ != 
    
      null
    
    ).ToList();

                var desc = mongodbDatabaseDescriptionCache[typeFullName];

                WcfServiceLocator.GetSafeService<IMongodbServer>().SubmitData(mongodbDataList, desc.SentToServer ? 
    
      null
    
     : desc);
  

先是获取要提交的数据,然后再获取元数据,如果有的话和主体数据一并提交到服务端。通过Wcf分布式数据服务获取到IMongodbServer,并调用它的SubmitData方法,定义如下:

    [OperationContract]

        
    
      void
    
     SubmitData(IList<MongodbData> dataList, MongodbDatabaseDescription databaseDescription);
  

MongodbData的定义如下:

        [DataContract(Namespace = 
    
      "Adhesive.Mongodb"
    
    )]

    
    
      public
    
    
      class
    
     MongodbData

    {

        [DataMember]

        
    
      public
    
    
      string
    
     TypeFullName { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     DatabaseName { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     TableName { get; set; }



        [DataMember]

        
    
      public
    
    
      string
    
     Data { get; set; }

    }
  

在这里可以发现Data是字符串类型,那是因为我们把要提交的数据主体转换成了Json,否则我们是无法通过Wcf提交Dictionary<string, object>构成的一颗无限级树的。在这里,我们略去介绍ConvertItemToMongodbData的实现,它其实并不复杂,也是通过递归和反射无限级获取类的所有属性的值,并转换为Dictionary<string, object>,只不过在这里面需要处理列表类型、字典类型以及枚举。

 

至此为止,客户端的部分介绍完了,现在我们来看一下服务端部分。首先,服务端也有根据每一个类型的配置:

        [ConfigEntity(FriendlyName = 
    
      "Mongodb服务端针对每个数据类型的配置"
    
    )]

    
    
      public
    
    
      class
    
     MongodbServerConfigurationItem

    {

        [ConfigItem(FriendlyName = 
    
      "类型完整名"
    
    )]

        
    
      public
    
    
      string
    
     TypeFullName { get; set; }



        [ConfigItem(FriendlyName = 
    
      "服务器名"
    
    )]

        
    
      public
    
    
      string
    
     MongodbServerUrlName { get; set; }



        [ConfigItem(FriendlyName = 
    
      "是否提交到数据库"
    
    )]

        
    
      public
    
    
      bool
    
     SubmitToDatabase { get; set; }



        [ConfigItem(FriendlyName = 
    
      "队列最大项数"
    
    )]

        
    
      public
    
    
      int
    
     MaxItemCount { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费的线程总数"
    
    )]

        
    
      public
    
    
      int
    
     ConsumeThreadCount { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费数据的时间间隔毫秒"
    
    )]

        
    
      public
    
    
      int
    
     ConsumeIntervalMilliseconds { get; set; }



        [ConfigItem(FriendlyName = 
    
      "遇到错误时消费数据的时间间隔毫秒"
    
    )]

        
    
      public
    
    
      int
    
     ConsumeIntervalWhenErrorMilliseconds { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费数据的批量项数"
    
    )]

        
    
      public
    
    
      int
    
     ConsumeItemCountInOneBatch { get; set; }



        [ConfigItem(FriendlyName = 
    
      "达到最大项数后的策略"
    
    )]

        
    
      public
    
     MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费数据时不足批次数的策略"
    
    )]

        
    
      public
    
     MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }



        [ConfigItem(FriendlyName = 
    
      "消费数据遇到错误的策略"
    
    )]

        
    
      public
    
     MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; }



        
    
      public
    
     MongodbServerConfigurationItem()

        {

            TypeFullName = 
    
      ""
    
    ;

            SubmitToDatabase = 
    
      true
    
    ;

            ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems

                .Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond);

            ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException;

            ConsumeThreadCount = Environment.ProcessorCount;

            ConsumeIntervalMilliseconds = 10;

            ConsumeIntervalWhenErrorMilliseconds = 1000;

            ConsumeItemCountInOneBatch = 100;

            NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems;

            MaxItemCount = 100000;

        }

    }
  

这个配置和客户端的配置差不多,只不过这里把是否提交到服务端改为了是否提交到数据库。在获取了配置之后,同样把数据提交到内存队列,然后由内存队列提交到数据库。核心代码如下:

    
      try
    
    

            {

                var sw = Stopwatch.StartNew();



                var server = CreateMasterMongoServer(typeFullName);

                
    
      if
    
     (server != 
    
      null
    
    )

                {

                    var database = server.GetDatabase(item.DatabaseName);

                    var collection = database.GetCollection(item.TableName);

                    var documentList = 
    
      new
    
     List<BsonDocument>();

                    JavaScriptSerializer s = 
    
      new
    
     JavaScriptSerializer();

                    mongodbDataList.ForEach(i =>

                    {

                        var dic = s.DeserializeObject(i.Data) 
    
      as
    
     IDictionary;

                        var document = 
    
      new
    
     BsonDocument().Add(dic);

                        documentList.Add(document);

                    });



                    collection.InsertBatch(documentList);

                    LocalLoggingService.Debug(
    
      "Mongodb服务端成功服务提交 {0} 条数据到数据库,类型是 '{1}',耗时 {2} 毫秒"
    
    , documentList.Count, typeFullName, sw.ElapsedMilliseconds);

                }



            }

            
    
      catch
    
     (Exception ex)

            {

                AppInfoCenterService.ExceptionService.Handle(ex, categoryName: ServiceName, subcategoryName: typeFullName, description: 
    
      "写入数据出现错误"
    
    , extraInfo: 
    
      new
    
     ExtraInfo

                {

                    DisplayItems = 
    
      new
    
     Dictionary<
    
      string
    
    , 
    
      string
    
    >()

                    {

                        {
    
      "DatabaseName"
    
     , item.DatabaseName}, 

                        {
    
      "TableName"
    
    , item.TableName}

                    }

                });

            }

        }
  

首先是Json反序列化获取到数据,然后转换为BsonDocument,最后批量提交到数据库中。

本文介绍了Mongodb数据服务的插入数据部分在客户端和服务端之间的逻辑,下一篇将介绍Mongodb数据服务查询数据的部分。

Adhesive框架系列文章--Mongodb数据服务模块实现(上)


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论