無限風域

Apache Spot



Warning: getimagesize(/www/wwwroot/blog.windawings.com/go/p34FmMs2/): failed to open stream: No such file or directory in /www/wwwroot/blog.windawings.com/usr/plugins/AMP/Action.php on line 521

Warning: getimagesize(/www/wwwroot/blog.windawings.com/go/KV4AKXiq/): failed to open stream: No such file or directory in /www/wwwroot/blog.windawings.com/usr/plugins/AMP/Action.php on line 521

[RUBYTOC]

<iframe width="560" height="315" src="//www.youtube.com/embed/lqdSicFKsWg" frameborder="0" gesture="media" allow="encrypted-media" allowfullscreen class="youtube"></iframe>

[引言]^(Preface)


  Apache Spot由Intel和Cloudera向Apache基金会贡献, 其前身为Intel在Cloudera平台上开发的开源专案Open Network Insight(ONI), 采用了Open Data Model(ODM)规格化安全资料。Spot基于高效的流量与数据包分析和机器学习, 提供识别云环境下的潜在安全威胁和未知的网络攻击的能力。考虑其使用规格化的安全数据集, 未来具有开源共享安全数据集提高机器识别能力的潜质。   目前Spot支持NetFlow, sFlow, DNS和Proxy的网络流量分析, 以HDFS和Hive提供存储, 由Spark提供计算, 基于LDA算法提供无监督机器学习(unsupervised)以及Jupyter提供图形化支持。   Spot尚处于孵化阶段(Incubate), 由Centrify, Cloudera, Cybraics, Endgame, Intel, Jask, Streamsets, Webroot等厂商投入开发与贡献。 # [介绍]^(Introduction)
## [优势]^(Advantages)   Apache Spot基于以上四种网络流量进行分析, 其实是三种, 它把网站流量划分了内外网两种, 另外两个分为代理流量和DNS. 值得一提的是, 官网上表示需要一天后才初具功能, 也就是需要初始的训练集来进行机器学习。
- Telemetry: 计量数据采集, 这里指Flows, DNS和Proxy的抓包和日志记录 - Perimeter Flows: 周边流量, 即外网流量(相对于内网流量) ## [机制]^(Works)   Apache Spot通过使用机器学习来识别流量特征, 并标识出各个流量簇的唯一行为。其主要对网络流量采用富文集, 噪声过滤, 白名单和启发式算法的方式进行处理,从而生成可能造成安全威胁的名单。
### [并行框架]^(Parallel Ingest Framework)   目前其采用kafka和flume对流量数据批量加载到HDFS和Hive中, 数据由多种格式存储以便于搜索, 机器学习, 转移到法律执行过程模型或者导入其他系统。 #### Kafka
  kafka分布式发布订阅消息系统(基于zookeeper, scala和java编写, 原LinkedIn开发), 可通过Hadoop的并行加载机制统一线上和离线消息处理, kafka可把消息种子(feed)分成多个[主题]^(topic), 比如spot-ingest划分出来的flow, dns和proxy三个主题. 每个[消息]^(record)由一个key, 一个value和时间戳构成。
##### [生产者]^(Producer)   生产者可[发布]^(push)消息到(append, 顺序写磁盘, 经验证, 顺序写磁盘效率比随机写内存还要高, 这是Kafka高吞吐率的一个很重要的保证)一个或多个主题中的[分区]^(partition)里(即发布到broker集群里该消息主题分区所在的leader中(通过查询zookeeper/brokers/..../state节点找到leader), 分区选择(即消息路由)由key进行hash运算, 若指定了分区则直接路由, 若key和分区都未指定, 则使用round-robin). producer.type默认为sync同步push消息, 为async异步时, 生产者可以以batch形式push消息, 提高broker性能的同时消息失去可靠性。 ##### [消费者]^(Consumer)   消费者可订阅一个或多个主题, 从broker(一个kafka节点为一个broker, 一个或多个broker组成一个kafka集群)[拉取]^(pull)已发布的消息, 一条消息只有一个消费者处理。可创建[消费者组]^(consumer group)作为逻辑上的订阅者, 组内多个消费者可拓展性能和容错。组内消费者共享Group ID, 与主题一一对应; 主题有多个分区, 一个分区可由组内的一个消费者消费, 分区是消费的最小单位; 一般来说, 分区数可大于等于组内消费者数, 多的消费者永远也无法分配到可供消费的分区, 但多个分区可由同一个消费者消费。当需求多线程消费者实例时, 需要生产者也以批量模式发布消息到broker的随机分区中(若分区不够随机, 其他线程的消费者也无法消费到自己以外的分区); 消费者会提交自己的消费[偏移量]^(offset), 0.8版本偏移量由broker管理(提交到zookeeper节点/consumers//offsets//), 0.9版本后由消费者组管理(提交到\_\_consumeroffsets topic摆脱对zookeeper的依赖, offsets.topic.num.partitions设置其分区数, 默认50, 分区位置由Group Id的哈希值对其求余)。 ##### [协调者]^(Coordinator)   0.8版本的协调者借助zookeeper对消费者组进行管理, 协调者监听zk节点变化, 消费者自己决定分区分配方案并抢占注册; 0.9以后版本则每个消费者组分配一个协调者, 消费者组的第一个消费者启动后向Kafka Server确定其组的协调者, 之后则与协调者进行协调通信。协调者监听主题和消费者组判断是否做rebalance(一组协议, 规定消费者组基于range, round-robin或一个待开发的新分配器分配分区(分配在消费者组 leader完成), 有三种触发条件: 组成员变更; 订阅主题数变更; 订阅主题分区数变更。有五个协议处理协调者问题: Heartbeat, LeaveGroup, JoinGroup: 消费者 → 协调者; SyncGroup: 消费者组 leader → 协调者 → 消费者组 memeber; DescribeGroup:显示组的所有信息, 包括成员信息, 协议名称, 分配方案, 订阅信息等)。一个消费者组对应一个协调者, 分区leader所在broker即为被选定的协调者, 每次rebalance产生新的rebalance generation(递增整数)。消费者组有五种状态: Dead(组内没有任何成员的最终状态, 组的元数据被协调者移除, 请求结果:UNKNOWN\_MEMBER\_ID); Empty(组内无成员但是位移信息还没有过期, 能响应JoinGroup); PreparingRebalance(组准备开启新的rebalance, 等待成员加入); AwaitingSync(等待消费者组 leader将分配方案传给各成员); Stable(rebalance完成, 可开始进行消费)。 ##### [分区]^(Partition)   每个主题下存在多个分区, 分区是一个不可变的顺序消费队列, 分区中存在一个唯一表示的偏移量, 该偏移量由唯一消费者控制, 而各个消费者间的偏移量互不影响。分区使得一个主题可分布到broker的多个服务器中, 同时自身可拷贝多份作为备份容错。既然涉及到备份容错, 则分区由leader和0...N个follower存储, leader处理该分区所有读写请求, follower备份数据, 从而使broker通过分区容错(主题配置[复制因子]^(replication facto)为N, 即可允许N - 1个服务器宕机而不丢失已提交的消息)。 ##### [分段]^(Segment)   分区由多个分段组成, 分段由.index索引文件和.log数据文件组成。.index记录消息序号和物理地址偏移量, 第一个消息序号为1, 偏移量为0, 是该分区全局的第一个消息, .index文件名由其上一个分段的.index文件的最后一个消息的物理偏移量确定, 这样消费者使用偏移量(.index文件名数值+消息序号)即可通过线性复杂度查找, 根据对应分段的.index文件, 再通过.index的索引位置找到对应数据。(消费者通过3712的偏移量, 找到<$i$\>.index ($3712 \\le i \\le$ <下一个.index文件名 \>), 找到里面序号为<$i$ - 3712\>的索引, 获得索引值, 即物理偏移量, 即可根据物理偏移量获取到此消息) ##### AR ISR OSR $$\\begin{equation}\\begin{split} \\text{AR(assigned replicas所有副本)} &= \\text{ISR(in-sync replicas副本同步队列, 由分区leader维护的列表)}\\\\ &  +\\text{ OSR(outof-sync replicas超时副本同步队列,}\\\\ &    \\text{由超过replica.lag.time.max.ms和新加入的follower加入)} \\end{split}\\end{equation}$$ ##### LEO HW   取一个分区对应ISR中最小的LEO(log end offset, 分区最后一个消息的偏移量)作为HW(hight water mark高水位, leader和follower都有自己的HW), 消费者能获取到的最新消息即为[高水位]^(HW)所在的消息, 内部broker的读取请求则没有高水位限制。 ##### Controller   kafka集群中的一个broker将会被选举为Controller负责分区管理和副本状态管理, zookeeper中/brokers/topics//partitions//state对ISR进行管理, 该节点由Controller和Leader共同维护。 分区改变leader, 则Controller发起LeaderAndIsrRequest通知所有replicas; ISR变化, 则由leader更新该节点的ISR信息。当ISR中副本的LEO不一致时, 如果此时leader挂掉, 选举新的leader时并不是按照LEO的高低进行选举而是按照ISR中的顺序选举。 ##### Acks Min-Insync-Relicas Unclean-Leader-Election-Enable $$\\begin{cases} request.required.acks = -1\\text{时和min.insync.replicas配合使用, 表示生产者需要等待ISR中所有follower的ack才确认消息已被接收;}\\\\ request.required.acks = 0\\text{时, 生产者不等待ack确认; 为1时只等待leader的ack(leader宕机数据丢失);}\\\\ \\begin{cases} request.required.acks = -1\\\\ min.insync.replicas = 2\\\\ unclean.leader.election.enable = false\\\\ AR = 3 \\end{cases}\\text{时, 则ISR中任意一个broker宕机, 则消息只能读取, 都宕机, 则该replicas(即该分区)失去可用性。} \\end{cases}$$ ##### Delivery Guarantee   a) At most once: 消息可能会丢,但绝不会重复传输;   b) At least once: 消息绝不会丢,但可能会重复传输 (目前kafka采用的生产者发布消息的形式, 引起消息去重问题, 要求消息具备幂等性, 设置GUID(globally unique identifier)标记消息, 在客户端中做去重则要求集中式缓存(redis, memcached...));   c) Exactly once: 每条消息肯定会被传输一次且仅传输一次。 #### Flume
  flume可分布式日志收集系统(java1.6+编写, 由Cloudera开发, 2009年捐赠给Apache基金会, Hadoop相关组件)将不同源([扇入]^(fan in))的海量日志进行收集,提供对数据的简单处理, 并写到不同目的地([扇出]^(fan out))。flume由原来的OG版本到现在NG版本, 进行了架构重构,两个版本互不兼容。经过架构重构后, NG版本演变为了一个轻量工具, 适应各种方式收集日志, 支持failover和负载均衡。
##### [事件]^(Event)   事件是flume内部数据传输的基本单元, 也是事务的基本单元。event由可选headers(key-value键值对, 不同的source生成不同的header, 允许修改event添加header, 需要拦截器和选择器机制的支持从而对event进行路由)和body(字节数组, 事件的实际内容)组成(只支持如下格式:\[{header:{"key": "value"}, body:"context"},\])。事件由client产生, 经由client → agent{source(netcat, http...) → channel(memory, jdbc, file...) → sink(hdfs, hbase, null...)} → destination的路线到达目的地, 在送达sink前需要在channel缓存event, 数据达到sink后才会删除channel中的缓存,而在event从源到目的的迁移的抽象成为[流]^(flow)。 ##### [数据源]^(Source)
  source从client接收数据, 并把数据以event的格式临时存放在一个或多个channel中, 直到event被sink成功发送到destination后, channel才将临时数据删除。在Spooling Directory Source中, 被发送过的数据被标记.COMPLETED(后缀名, 可自定义). source支持的类型如下: |**类型**|**说明**|**实现类**| |--|--|--| |Avro|支持 Avro协议(实际上是Avro RPC), AvroLegacySource(0.9.x OG版本兼容), 内置支持|AvroSource, AvroLegacySource | |Thrift|支持Thrift 协议, ThriftLegacySource(0.9.x OG版本兼容), 内置支持|ThriftLegacySource | |Exec|基于Unix Command在标准输出上生产数据|ExecSource| |JMS|从JMS系统(消息, 主题)中读取数据, ActiveMQ已测|| |Spooling Directory|监控指定目录内数据变更|| |Netcat|监听端口(官网上只写了tcp type), 将流经端口的每一个文本行数据作为event输入|NetcatSource| |Twitter 1% firehose|通过API持续下载Twitter数据(experimental)|| |Sequence Generator|序列生成器数据源, 生产序列数据|SequenceGeneratorSource| |Syslog|读取syslog数据产生Event, 支持UDP和TCP两种协议|SyslogTcpSource, SyslogUDPSource| |HTTP|基于HTTP POST或GET方式的数据源, 支持 JSON和BLOB 表示形式|| |ScribeSource||ScribeSource | |StressSource|主要用于测试, 由连续的具有相同payload的Event构成, 不用于生产环境|StressSource| |(custom type as FQCN) |自定义Source|(custom FQCN) | ##### [管道]^(Channel)   Channel是一个临时的存储容器, 它把从source接收到的数据以event格式存储起来, 直到event被sink消费后才把临时数据删除。Channel可和多个source和sink连接, 成为source和sink的桥梁, 提供数据收发的一致性保证。常见类型为Memory, JDBC和File. Memory提供高速吞吐但无法保证数据的完整性, File通过持久化数据至磁盘保证数据的完整性。 |**类型**|**说明**|**实现类**| |--|--|--| |Memory|存储在内存中, 高速但非持久化的管道|MemoryChannel| |File|可读写、映射和操作文件的管道, 存储Event在磁盘中|FileChannel| |JDBC|基于JDBC的可持久化的管道, 内置支持Derby|JDBCChannel| |Recoverable Memory|基于本地文件存储系统提供持久化的管道(据说建议用FileChannel代替)|RecoverableMemoryChannel| |Spillable Memory |存储在内存和磁盘中, 内存队列满则持久化到磁盘上(Experimental)|| |PseudoTxnMemory|测试用Channel, 不用于生产环境|PseudoTxnMemoryChannel| |(custom type as FQCN) |自定义Channel|(custom FQCN) | ##### [水槽]^(Sink)
  Sink从channel中读取并删除event, 将event传递到Hbase、HDFS或flow pipeline的下一个agent等。Sink类型如下: |**类型**|**说明**|**实现类**| |--|--|--| |HDFS|写入所有Event到HDFS, 支持Rolling, Bucketing, HDFS-200 Append等|HDFSEventSink| |Hbase|从Channel读取Event并写入Hbase(从Async的实现类来看似乎可以异步)|HBaseSink, AsyncHBaseSink| |Logger|基于配置好的日志子系统以info日志级别记录events(默认log4j)|LoggerSink| |Avro|对所有Event使用预置的Avro协议发送到配置的RPC端口,当同配置Avro Source时, 形成分层的集合|AvroSink| |File Roll |存储在内存和磁盘中, 内存队列满则持久化到磁盘上(Experimental)|RollingFileSink | |IRC|event在IRC回放|IRCSink| |Null|写入/dev/null, 即丢弃所有Event|NullSink| |Morphline Solr|写入Solr搜索服务器(Clustered)|| |ElasticSearch|写入Elastic Search搜索服务器(Clustered)|ElasticSearchSink| |Thrift|写入Thrift|ThriftSink| |Kite Dataset|写入到Kite Dataset(Experimental)|| |(custom type as FQCN) |自定义Sink|(custom FQCN) | ##### [拦截器]^(Interceptor)
来源于 http://blog.csdn.net/ty_laurel/article/details/54585726
  拦截器用于source与channel之间对event数据进行处理, 主要对event的header进行CUD, 拦截器有如下类型: |**类型**|**说明**|**实现类**| |--|--|--| |Host|加入Agent主机名或IP地址|HostInterceptor$Builder| |Timestamp|加入当前时间戳(ms)|TimestampInterceptor$Builder| |Static|加入一组静态的[键值对]^(key-value)|StaticInterceptor$Builder| |Regex Filter |对Event Body中做正则提取出需要的匹配项|RegexFilteringInterceptor$Builder| |UUID|加入UUID|| |Morphline|Cloudera开源ETL(ElasticSearch, Logstash, Kibana)框架(目前由Kite主导开发), 富配置转化链, Grok正则解析|| |(custom type as FQCN) |自定义Interceptor|(custom FQCN) |   关于Morphline框架在Flume中的作用, 由以下两个图展示:
来源于 https://allthingshadoop.com/2014/05
来源于 http://vinoyang.com/2015/11/20/build-log-sys-with-flume-and-morphline
##### [管道选择器]^(Channel Selector) |**类型**|**说明**|**实现类**| |--|--|--| |Replicating |把从Source接收到的所有Event发往所有的Channel(default)|ReplicatingChannelSelector| |multiplexing |根据Event Header中的Key选择管道|MultiplexingChannelSelector| |(custom type as FQCN) |自定义Channel Selector|(custom FQCN) | ##### [水槽处理器]^(Sink Processor)   Sink Processtor可用于激活sinks中特定sink用于[负载均衡]^(load balance)。 |**类型**|**说明**|**实现类**| |--|--|--| |Default|单独的Sink|DefaultSinkProcessor| |Failover |维护Sinks优先级列表, 保证只要有一个Sink即可处理Event(故障转移)(若没有设置优先级则按声明顺序)|FailoverSinkProcessor| |Load Balance|维护Active Sinks索引列表, 选择机制有Round Robin(default)和Random, 也可以自定义继承AbstractSinkSelector|LoadBalancingSinkProcessor| |(custom type as FQCN) |自定义Sink Processor|(custom FQCN) | ##### [事件序列器]^(Event Serializer)   Event需要对其Body进行序列化, 主要有以下类型: EventSerializer$Builder(Text, Avro Event), EventSerializer(SimpleHbaseEventSerializer, SimpleAsyncHbaseEventSerializer, RegexHbaseEventSerializer), HbaseEventSerializer(HbaseSink的自定义序列化), AsyncHbaseEventSerializer(AsyncHbaseSink的自定义序列化), EventSerializer$Builder(除了HbaseSink和AsyncHbaseSink外其他Sink的自定义序列化)。 #### HDFS
  Hadoop[分布式文件系统]^(distributed fileSystem)为Hadoop(java, 前yahoo主导)核心子项目, 基于流式数据访问模式处理超大文件需求开发, 适用于一次写入多次读取的应用场景, 不适合低延迟数据访问需求, 小文件存储, 并发写入(不允许多线程同时写入同一文件)和随机修改(仅支持数据[追加]^(append))。主要有Client(调用HDFS API, 从NameNode获取文件元数据与DataNode交互数据读写), NameNode(元数据节点, 管理元数据, 分配数据存储节点), DataNode(数据存储节点, 负责数据存储读写与冗余备份)。此项目主要采用Spark计算和Hive, Impala的查询, MapReduce使用不明显, 暂时不写MapReduce, ResourceManager(Yarn), TaskTracker以及JobTrackter计算任务框架。 - RPC通信 - 可构建在廉价机器上 - 提供统一抽象目录树 - 高容错(默认3副本, 由[机架感知]^(rack-aware)副本在DN本地存一份, 同机架存一份, 不同机架存一份, 每个DN最多存一个副本, 每个机架最多存两个副本) - 2.x版本后提供高可用性, 对NN配置secondary NN, 根据checkpoint机制合并NN上的editslog(记录元数据操作)到fsimage(磁盘元数据镜像, NN宕机时快速恢复元数据用) - [分块]^(block)存储(2.x版本默认128M, 此前为64M), 有DN承担, DN发送心跳告知NN[存活]^(alive)状态, 也汇报自身block信息(自身数据校验不通过block的不汇报) - NN启动先进入安全模式, 当NN确认一定百分比的block安全(block副本数达到最小值为副本安全)的30s后, NN退出安全模式可进行block备份操作 - 数据访问模式: client(可配置数据副本数与block大小) → NN(检查, 取数据元数据和block分布(由距离client远近排序), 返回FSDataOutputStream, FSDataInputStream 对象), client(通过NN返回的对象与DN交互) → DN(读写数据)
#### Hive
  Hive(java, Facebook)是构建在Hadoop上的数据仓库框架, 使用HQL(类SQL语法(不支持主键外键, 可创建索引), 转化SQL为分布式作业, 支持MapReduce(最稳), Spark或Tez)对数据执行CRUD, 提供流式API, 适用于传统数据仓库业务, 不适用于低延迟的交互访问。Hive几乎是Hadoop上的SQL标准, 适合离线ETL与大数据离线Ad-hoc查询, 以及特大规模数据集合精准结果的查询。对于需要交互式的Ad-hoc查询方案, 通常选择Impala, Presto等。
##### [组件]^(Compenent)
- UI: 用户向系统提交查询和其他操作的接口, 目前有Cli, Hive Web Server提供 - Driver: 处理查询, 实现会话交互, 提供基于JDBC/ODBC接口上的执行与获取的API - MetaStore: 存储hive元数据([表格]^(table)元数据, [分区]^(partition)元数据, [桶]^(bucket)元数据...)到关系型数据库(支持MySQL, Derby...) - Compiler: 基于[语义分析]^(semantic analyzer)解析查询块和查询语句, 最终通过metastore中查询表和分区的元数据, 生成可供MapReduce, Spark或Tez执行的执行计划。HQL [———→]^(Antlr) [抽象语法树]^(AST) [——————→]^(semantic analyzer) [最简查询块]^(QB)(from子句递归生成一个QB) [——————→]^(logical plan) OP Tree(有向无环图) [——————→]^(logical optimizer) OP Tree([谓词下推]^(predicate push down), [分区剪裁]^(partition prunner), [关联排序]^(join reorder)) [——————→]^(physical plan) Task Tree(遇到分发则切一刀生成MapReduce作业, 如group by, join, distributed by, distinct...etc, 分出多个子图, 每个子图构成一个节点, 节点连成执行图) [——————→]^(physical optimizer) Task Tree(基于输入选择路径, 增加备份作业etc)(该过程可通过explain + statement查看) - ExecutionEngine: 通过管理不同阶段性计划中的依赖, 在对应的系统中(HDFS)执行Compiler生成的基于阶段性DAG的执行计划, 新版本有多种实现(MapReduce, Spark或Tez)
来源于 http://www.codedata.cn/hacknews/146917894602293594
##### [数据模型]^(Data Model) - Table: 类似传统关系型数据库的表, 能进行过滤, 映射, Join和Union. Table中的数据全部存储在HDFS的一个目录下。支持[外表]^(external table), 即table可由HDFS中已存在的文件或目录创建。Column, Row等概念与传统关系型数据库类似 - Partition: 分区基于HDFS子目录实现, 每个Table可根据partition key决定数据的存储方式。分区是表中部分列的集合。分区可以减少每次遍历的数据量改善性能。但HDFS不支持大量的子目录, 所以需要考虑分区数量避免过多子目录创建。有静态分区和动态分区: 静态分区在创建表时使用partitioned by定义(创建时需要指定column和数据类型, [加载数据]^(load data)时需要指定具体为该column的哪个值); 动态分区由设置set hive.exec.dynamic.partition=true;加载数据时不需要指定column的值, hive.exec.dynamic.partition.mode默认值为strick, 即不允许所有分区都是动态的, 如果需要父目录及其子目录全部为动态分区, 需要指定该参数为nostrick - Bucket(or Cluster): 每个分区可进一步被分桶, 分桶通过对指定column进行哈希实现(hash结果对桶数求余, 保证每个bucket都有数据), 在数据量足够大时, 分桶比分区有更高的查询效率。需要设置hive.enforce.bucketiong=true;使用关键字clustered by指定column和分桶数
来源于 http://www.codedata.cn/hacknews/146917894602293594
##### [数据类型]^(Data Type) |**类型**|**支持**|**说明**| |--|--|--| |TinyInt|1-byte signed integer, -128 ~ 127|后缀Y| |SmallInt|2-byte signed integer, -32,768 ~ 32,767|后缀S| |Int/Integer(默认)|4-byte signed integer, -2147483648 ~ 2147483647|| |BigInt|8-byte signed integer, -9223372036854775808 ~ 9223372036854775807|后缀L, 大于BIGINT的值需要使用BD后缀和Decimal(38,0)处理| |Float|4-byte single precision floating point number|| |Double|8-byte double precision floating point number|| |Decimal|在Hive 0.11.0介绍 (HIVE-2693), 在Hive 0.13.0改进 (HIVE-3976)|| |TimeStamp|Hive 0.8.0及其以上支持|支持传统的 UNIX 时间戳和可选的纳秒精度| |Ddate|Hive 0.12.0及其以上支持|| |Interval|Hive 1.2.0及其以上支持|| |String|可用单双引号表示, 以C风格转义|| |VarChar|1 ~ 65535, 超过varchar(length)中length的长度的字串将被截断, 尾部空格影响比较结果|| |Char|1~255, 固定长度, 字串比指定长度短的部分由空格代替, 尾部空格不影响比较结果|| |Boolean|true or false|| |Binary|Hive 0.8.0及其以上支持|| |Array|由相同类型元素组成, 下标从0开始|| |Map|[键值对]^(key-value)|| |Struct|可包含不同类型元素, dot(.)符号访问语法|| |Union|Hive 0.7.0及其以上支持, 可综合以上数据类型|据说还没有完全支持,官方建议只用于查看|   复杂类型的使用方法: ```sql create table employees ( name string, salary float, subordinates array, deductions map, address struct ) partitioned by (country string, state string); ``` ##### [优化]^(Optimization)   Hive中最重要的部分是Group By和Join, MapReduce就是Group By或者Join的过程。 - Group By: 可在执行SQL前设置hive.map.aggr = true, 使Map端发往Reduce端前部分聚合减少数据量; 也可以设置hive.groupby.skewindata = true把一个MR job转化为两个, 第一道MR job输出结果随机分发Reduce端, 每个Reduce端按Group By的key部分聚合, 第二道MR job根据预处理结果按Group By的key分布到Reduce端完成最终聚合。二者原理都是通过部分聚合减少Reduce端的数据量, 而聚合的有效性和效率通常与UDAF(user-defined aggregation funcation)有关, 且只对代数聚合函数(count, sum...)有效, 对整体聚合函数(avg, mean...)无效 - Join: 应该尽可能把分布均匀的表放在左边, 把倾斜表放在右边; 通过使用MAPJOIN(<小表名>)把小表打成一个哈希表序列化文件的压缩包, 通过分布式缓存均匀分发到各个job执行节点上, 然后在节点解压在内存中完成关联(此处没有用到Reduce, 也不存在数据倾斜), 默认情况下小表 $\\le$ 25M, MAPJOIN通常不会手动设置, 而是通过Hive中的physical optimizer把join优化为auto map join, 通过给Task设置一个conditional task把任务划分为$\\le$ 25M和$\\gt$ 25M, 新版Hive中默认hive.auto.convert.join = true #### Impala
  Impala(c++)在Cloudera受Google dremel启发下开发的实时SQL查询引擎(分布式大规模并行处理(MPP)数据库引擎), 功能上与shark(依赖Hive)和Drill(apache)类似。它脱离了Hive中MR批处理的缓慢, 采用与商用并行关系数据库类似的由planner, coordinator, exec engine组成的分布式查询引擎降低延迟。
Impala特性: - 基于内存, 对内存依赖大 - 不再转化为MR - C++编写, 运行时代码生成LLVM IR(low level virtual machine intermediate representation)提高效率 - 完全依赖于Hive, 直接使用Hive的元数据, 兼容HQL, 具有数据仓库特性, 可对Hive数据进行查询, 稳定性低于Hive - 支持data local, 列式存储, JDBC/ODBC查询以及SQL92标准, 拥有自己的解析器和优化器 - Parquet列式存储 ##### Impala Daemon   Impala Daemon是一个在集群的每个DN上运行的守护进程(impalad), 主要接收client, hue, jdbc/odbc请求, 执行query并返回给协调节点, 负责与statestore通信汇报节点状态。在2.9+版本中, 可以分配集群中coordinator和exec engine角色给不同主机从而提高高并发负载的可伸缩性。impalad包含了planner, coordinator, exec engine, 一般impalad与DN在同一节点(data local)。当某个分发执行的impalad失败时, 整个计划任务都返回失败, 不过再次提交一次查询也没有多少消耗。 - Query Planner: 解析SQL为执行计划 - Query Coordinator: 指定查询的主节点(某个impalad), 通知其他节点主节点的信息, 主节点待其他节点查询结果返回给它后, 主节点再返回给中心协调节点 - Query Exec Engine: 做查询工作   Impalad的查询执行分为frontend(java, 以JNI嵌入impalad, 生成查询计划)和backend(c++, 执行查询)。Frontend先生成单机查询计划(与关系型数据库执行计划相同, 查询优化方法类似), 后生成分布式查询计划(减少数据移动, 数据与计算尽量放一起)。
  上图为三张表join后做聚合再排序取topN的例子。Impala查询优化器支持代价模型(以表和分区为基数, 每列distinct值个数做统计估算执行计划代价, 从而生成较优执行计划)。可以看到上图左侧则为frontend生成的单击查询计划, 进而转换为6个segment(彩色无边框圆角矩形背景, 每个segment可由单个主机独立执行, 为计划子树)的分布式查询计划。Impala支持表广播(把join中一个表广播到相关节点, 如图t3)和哈希重分布(根据join字段哈希值重新分布两张表数据, 如图t1, t2)两种分布式join方式。分布式查询计划中的聚合函数先对本地数据进行分组聚合(如图Pre-Agg)降低数据量和数据重分布, 再把上一步的结果在汇总聚合(如图MergeAgg)计算出最终结果, topN计划过程同理。BackEnd从frontend接收plan segment执行, 执行性能优化方面, 有向量执行(getNext处理一批数据, 多个操作符做pipeline), LLVM IR, IO本地化以及parquet列式存储。 ##### Impala Statestore   Impala Statestore负责收集各个impalad进程的资源信息, 各节点健康情况以及同步节点信息。只需要在一个主机上运行这样的一个进程即可(statestored), 如果某个impalad因为硬件故障, 网络错误, 软件问题或其他原因脱机, statestore会通知其他impalad避免向不可访问节点发出查询请求。statestore在impala集群中并不是一个关键进程, 如果statestore未运行或无法访问, impalad照常继续运行并分配工作, 相对的集群会缺少健壮性。 ##### Impala Catalog   Impala Catalog(Impala 1.2+)把Impala表的元数据分法到各个impalad中, 实现DDL。当impalad节点插入或查询时, impalad把自己的操作结果通知statestore, 之后statestore通知catalogd更新元数据信息。所以一般把catalogd和statestored放在同一个主机上运行, 且该主机不再运行impalad提供查询服务, 避免集群管理出现问题。 ##### Impala文件格式 |**类型**|**格式**|**压缩**|**Create**|**Insert**| |--|--|--|--|--| |Parquet|Structured|Snappy
GZIP|✔|✔| |Text|Unstructured|LZO|✔
如果建表时没有指定存储类型,默认采用未压缩的 text,字段由 ASCII 编码的 0x01 字符串分割|✔
如果使用了 LZO 压缩,则只能通过 Hive 建表和插入数据| |Avro|Structured|Snappy
GZIP
Deflate
BZIP2|✔
Impala 1.4.0+支持,之前的版本只能通过 Hive 来建表。|✖
只能通过 LOAD DATA 的方式将已经转换好格式的数据加载进去,或者使用 Hive 来插入数据| |RCFile|Structured|Snappy
GZIP
Deflate
BZIP2|✔|✖
只能通过 LOAD DATA 的方式将已经转换好格式的数据加载进去,或者使用 Hive 来插入数据| |SequenceFile|Structured|Snappy
GZIP
Deflate
BZIP2|✔|✖
只能通过 LOAD DATA 的方式将已经转换好格式的数据加载进去,或者使用 Hive 来插入数据| ##### Impala压缩编码 |**类型**|**说明**| |--|--| |Snappy|推荐编码, 压缩率和压缩速度有很好的平衡性, snappy压缩速度很快, 但比gzip压缩率低, Impala不支持snappy压缩的text file| |GZIP|压缩率高, Impala不支持gzip格式的text file| |Deflate|Impala不支持deflate压缩的text file| |BZIP2|Impala不支持bzip2压缩的text file| |LZO|只用于text file, Impala可查询lzo压缩的text格式数据表但不支持insert, 只能通过Hive完成insert| #### Spark ### [机器学习]^(Machine Learning) #### Scala ### [操作分析]^(Operational Analytics) ## [开放数据模型]^(Open Data Model) ## [用例]^(UseCase) # [分析]^(Analysis)
## [模块]^(Module) ### Spot-Ingest
  Spot-Ingest基于分布式架构, 使用spot-collector守护进程针对不同采集源产生数据时, 通过kafka, spark streaming, hive和hdfs提供近99.99999%的数据完整性。 #### Spot-Collector   Spot-Collector基于文件系统在后台监视文件系统的新文件。当网络工具产生新文件或较早产生的数据留在监视路径上时, collectors使用解析工具(比如nfdump和tshark)转化为可读格式, 并把其原始格式存储到Hadoop用于取证, 以avro-parquet格式存储在Hive中以便做SQL查询。$\\gt$ 1MB的文件提供其文件名和hdfs路径给kafka, $\\lt$ 1MB的文件提供其data event给kafka并在之后由spark streaming做进一步处理(在其github上描述是proxy的pipeline才用spark streaming处理, 不知道一般proxy数据是否是$\\lt$ 1MB的)。 ##### Spot-Ingest Kafka
  从图上看是生成flow, dns和proxy三个主题, 各主题分区数由spot-worker的数量决定, spot-collectors作为提供者传输数据到kafka存储, spot-worker作为消费者消费kafka中三个主题的数据。 ##### Spot-Worker   Spot-Worker作为后台守护进程订阅指定的kafka主题和分区, 并在特定的Hive表中读取解析存储数据, 该数据在将来由ML算法消费。当前有两种worker, 通过定义的解析器多线程处理数据的python worker和使用spark-streaming context(micro batching)执行spark应用处理来自kafka数据的spark-streaming worker. ### Spot-ML
  Spot-ML包含执行可疑连接的例程, 分析采集自网络中的netflow, dns或proxy日志。通过分析一系列网络事件, 生成一个最不可能和最可疑的事件列表, 该过程依赖于spot-ingest来加载netflow, dns和proxy记录。它使用[主题建模]^(topic model)来发现正常和异常行为, 把IP关联的日志集合作为文档, 并使用Latent Dirichlet Allocation(LDA)来发现这些文档集合中隐藏的语义结构。Spot-ML为每个IP地址的网络行为提供[概率模型]^(probabilistic model), 即赋予每个网络日志条目被该模型一个估算的概率(或得分), 得分较低的事件被标记为可疑以便进一步分析。   `LDA基于三层的贝叶斯模型, 是一个用于离散(discrete)数据的生成概率模型(generative probabilistic model), 例如文本全集。在这个模型中文档的每个单词(word)都是由一组基础主题混合生成的。LDA在网络流量中, 通过聚合和离散将网络日志条目转换为单词。这样, 文档对应IP地址, 日志条目的单词(与一个IP地址相关)和主题对应公共网络活动的概要文档。` ### Spot-OA ### Spot-Setup ## [环境]^(Environment)
## [演示]^(Demo) # [参考]^(Reference) ---   \[01\]  _kafka 基础知识梳理_, Go_小易, 2017-08-14 17:57   \[02\]  _kafka 学习笔记:知识点整理_, cyfonly, 2016-10-12 22:13   \[03\]  _kafka 数据可靠性深度解读_, 朱小厮, 2017-05-02 19:19   \[04\]  _Kafka 消费组 (consumer group)_, heidsoft, 2017-10-20 09:53   \[05\]  _Flume NG 基本架构及原理_, dantezhao, 2016-09-14 21:52   \[06\]  _Flume 架构以及应用介绍_, 安静的技术控, 2016-05-31 12:35   \[07\]  _日志系统之 Flume 采集加 morphline 解析_, yanghua, 2015-11-20   \[08\]  _flume 拦截器及问题解决_, ty\_laurel, 2017-01-17 18:50   \[09\]  _【漫画解读】HDFS 存储原理_, 雪飘飘, 2016-02-22 13:50   \[10\]  _深刻理解 HDFS 工作机制_, Pickle, 2017-01-11 08:59   \[11\]  _Apache Hive Design_, Administrator, 2015-11-08   \[12\]  _杨卓荦:Hive 原理及查询优化_, anand, 2016-07-07   \[13\]  _Impala(多图手机用户慎入,理论 + 实践)_, SET, 2016-09-21 23:33   \[14\]  _怎么理解 impala(impala 工作原理是什么)_, 邱明成, 2017-02-12 09:28   \[15\]  _Apache Spot_, 2016-03-29 `To be Continued...`

当前页面是本站的「Baidu MIP」版。查看和发表评论请点击:完整版 »