windawings

Apache Spot
#tree{min-height:20px;padding:19px;margin-bottom:20px;bo...
扫描右侧二维码阅读全文
11
2018/02

Apache Spot

目录(Index)

引言(Preface)


  Apache Spot由Intel和Cloudera向Apache基金会贡献, 其前身为Intel在Cloudera平台上开发的开源专案Open Network Insight(ONI), 采用了Open Data Model(ODM)规格化安全资料。Spot基于高效的流量与数据包分析和机器学习, 提供识别云环境下的潜在安全威胁和未知的网络攻击的能力。考虑其使用规格化的安全数据集, 未来具有开源共享安全数据集提高机器识别能力的潜质。
  目前Spot支持NetFlow, sFlow, DNS和Proxy的网络流量分析, 以HDFS和Hive提供存储, 由Spark提供计算, 基于LDA算法提供无监督机器学习(unsupervised)以及Jupyter提供图形化支持。
  Spot尚处于孵化阶段(Incubate), 由Centrify, Cloudera, Cybraics, Endgame, Intel, Jask, Streamsets, Webroot等厂商投入开发与贡献。

介绍(Introduction)


优势(Advantages)

  Apache Spot基于以上四种网络流量进行分析, 其实是三种, 它把网站流量划分了内外网两种, 另外两个分为代理流量和DNS. 值得一提的是, 官网上表示需要一天后才初具功能, 也就是需要初始的训练集来进行机器学习。

advantages
  • Telemetry: 计量数据采集, 这里指Flows, DNS和Proxy的抓包和日志记录
  • Perimeter Flows: 周边流量, 即外网流量(相对于内网流量)

机制(Works)

  Apache Spot通过使用机器学习来识别流量特征, 并标识出各个流量簇的唯一行为。其主要对网络流量采用富文集, 噪声过滤, 白名单和启发式算法的方式进行处理,从而生成可能造成安全威胁的名单。

advantages 2

并行框架(Parallel Ingest Framework)

  目前其采用kafka和flume对流量数据批量加载到HDFS和Hive中, 数据由多种格式存储以便于搜索, 机器学习, 转移到法律执行过程模型或者导入其他系统。

Kafka

kafka logo

  kafka分布式发布订阅消息系统(基于zookeeper, scala和java编写, 原LinkedIn开发), 可通过Hadoop的并行加载机制统一线上和离线消息处理, kafka可把消息种子(feed)分成多个主题(topic), 比如spot-ingest划分出来的flow, dns和proxy三个主题. 每个消息(record)由一个key, 一个value和时间戳构成。

kafka diagram
生产者(Producer)

  生产者可发布(push)消息到(append, 顺序写磁盘, 经验证, 顺序写磁盘效率比随机写内存还要高, 这是Kafka高吞吐率的一个很重要的保证)一个或多个主题中的分区(partition)里(即发布到broker集群里该消息主题分区所在的leader中(通过查询zookeeper/brokers/..../state节点找到leader), 分区选择(即消息路由)由key进行hash运算, 若指定了分区则直接路由, 若key和分区都未指定, 则使用round-robin). producer.type默认为sync同步push消息, 为async异步时, 生产者可以以batch形式push消息, 提高broker性能的同时消息失去可靠性。

消费者(Consumer)

  消费者可订阅一个或多个主题, 从broker(一个kafka节点为一个broker, 一个或多个broker组成一个kafka集群)拉取(pull)已发布的消息, 一条消息只有一个消费者处理。可创建消费者组(consumer group)作为逻辑上的订阅者, 组内多个消费者可拓展性能和容错。组内消费者共享Group ID, 与主题一一对应; 主题有多个分区, 一个分区可由组内的一个消费者消费, 分区是消费的最小单位; 一般来说, 分区数可大于等于组内消费者数, 多的消费者永远也无法分配到可供消费的分区, 但多个分区可由同一个消费者消费。当需求多线程消费者实例时, 需要生产者也以批量模式发布消息到broker的随机分区中(若分区不够随机, 其他线程的消费者也无法消费到自己以外的分区); 消费者会提交自己的消费偏移量(offset), 0.8版本偏移量由broker管理(提交到zookeeper节点/consumers//offsets//), 0.9版本后由消费者组管理(提交到__consumeroffsets topic摆脱对zookeeper的依赖, offsets.topic.num.partitions设置其分区数, 默认50, 分区位置由Group Id的哈希值对其求余)。

协调者(Coordinator)

  0.8版本的协调者借助zookeeper对消费者组进行管理, 协调者监听zk节点变化, 消费者自己决定分区分配方案并抢占注册; 0.9以后版本则每个消费者组分配一个协调者, 消费者组的第一个消费者启动后向Kafka Server确定其组的协调者, 之后则与协调者进行协调通信。协调者监听主题和消费者组判断是否做rebalance(一组协议, 规定消费者组基于range, round-robin或一个待开发的新分配器分配分区(分配在消费者组 leader完成), 有三种触发条件: 组成员变更; 订阅主题数变更; 订阅主题分区数变更。有五个协议处理协调者问题: Heartbeat, LeaveGroup, JoinGroup: 消费者 → 协调者; SyncGroup: 消费者组 leader → 协调者 → 消费者组 memeber; DescribeGroup:显示组的所有信息, 包括成员信息, 协议名称, 分配方案, 订阅信息等)。一个消费者组对应一个协调者, 分区leader所在broker即为被选定的协调者, 每次rebalance产生新的rebalance generation(递增整数)。消费者组有五种状态: Dead(组内没有任何成员的最终状态, 组的元数据被协调者移除, 请求结果:UNKNOWN_MEMBER_ID); Empty(组内无成员但是位移信息还没有过期, 能响应JoinGroup); PreparingRebalance(组准备开启新的rebalance, 等待成员加入); AwaitingSync(等待消费者组 leader将分配方案传给各成员); Stable(rebalance完成, 可开始进行消费)。

分区(Partition)

  每个主题下存在多个分区, 分区是一个不可变的顺序消费队列, 分区中存在一个唯一表示的偏移量, 该偏移量由唯一消费者控制, 而各个消费者间的偏移量互不影响。分区使得一个主题可分布到broker的多个服务器中, 同时自身可拷贝多份作为备份容错。既然涉及到备份容错, 则分区由leader和0...N个follower存储, leader处理该分区所有读写请求, follower备份数据, 从而使broker通过分区容错(主题配置复制因子(replication facto)为N, 即可允许N - 1个服务器宕机而不丢失已提交的消息)。

分段(Segment)

  分区由多个分段组成, 分段由.index索引文件和.log数据文件组成。.index记录消息序号和物理地址偏移量, 第一个消息序号为1, 偏移量为0, 是该分区全局的第一个消息, .index文件名由其上一个分段的.index文件的最后一个消息的物理偏移量确定, 这样消费者使用偏移量(.index文件名数值+消息序号)即可通过线性复杂度查找, 根据对应分段的.index文件, 再通过.index的索引位置找到对应数据。(消费者通过3712的偏移量, 找到<$i$>.index ($3712 \le i \le$ <下一个.index文件名 >), 找到里面序号为<$i$ - 3712>的索引, 获得索引值, 即物理偏移量, 即可根据物理偏移量获取到此消息)

AR ISR OSR

$$\begin{equation}\begin{split}
\text{AR(assigned replicas所有副本)} &= \text{ISR(in-sync replicas副本同步队列, 由分区leader维护的列表)}\\
&  +\text{ OSR(outof-sync replicas超时副本同步队列,}\\
&    \text{由超过replica.lag.time.max.ms和新加入的follower加入)}
\end{split}\end{equation}$$

LEO HW

  取一个分区对应ISR中最小的LEO(log end offset, 分区最后一个消息的偏移量)作为HW(hight water mark高水位, leader和follower都有自己的HW), 消费者能获取到的最新消息即为高水位(HW)所在的消息, 内部broker的读取请求则没有高水位限制。

Controller

  kafka集群中的一个broker将会被选举为Controller负责分区管理和副本状态管理, zookeeper中/brokers/topics//partitions//state对ISR进行管理, 该节点由Controller和Leader共同维护。 分区改变leader, 则Controller发起LeaderAndIsrRequest通知所有replicas; ISR变化, 则由leader更新该节点的ISR信息。当ISR中副本的LEO不一致时, 如果此时leader挂掉, 选举新的leader时并不是按照LEO的高低进行选举而是按照ISR中的顺序选举。

Acks Min-Insync-Relicas Unclean-Leader-Election-Enable

$$\begin{cases}
request.required.acks = -1\text{时和min.insync.replicas配合使用, 表示生产者需要等待ISR中所有follower的ack才确认消息已被接收;}\\
request.required.acks = 0\text{时, 生产者不等待ack确认; 为1时只等待leader的ack(leader宕机数据丢失);}\\
\begin{cases}
request.required.acks = -1\\
min.insync.replicas = 2\\
unclean.leader.election.enable = false\\
AR = 3
\end{cases}\text{时, 则ISR中任意一个broker宕机, 则消息只能读取, 都宕机, 则该replicas(即该分区)失去可用性。}
\end{cases}$$

Delivery Guarantee

  a) At most once: 消息可能会丢,但绝不会重复传输;
  b) At least once: 消息绝不会丢,但可能会重复传输 (目前kafka采用的生产者发布消息的形式, 引起消息去重问题, 要求消息具备幂等性, 设置GUID(globally unique identifier)标记消息, 在客户端中做去重则要求集中式缓存(redis, memcached...));
  c) Exactly once: 每条消息肯定会被传输一次且仅传输一次。

Flume

flume logo

  flume可分布式日志收集系统(java1.6+编写, 由Cloudera开发, 2009年捐赠给Apache基金会, Hadoop相关组件)将不同源((fan)(in))的海量日志进行收集,提供对数据的简单处理, 并写到不同目的地((fan)(out))。flume由原来的OG版本到现在NG版本, 进行了架构重构,两个版本互不兼容。经过架构重构后, NG版本演变为了一个轻量工具, 适应各种方式收集日志, 支持failover和负载均衡。

flume diagram
flume multi-agent diagram
事件(Event)

  事件是flume内部数据传输的基本单元, 也是事务的基本单元。event由可选headers(key-value键值对, 不同的source生成不同的header, 允许修改event添加header, 需要拦截器和选择器机制的支持从而对event进行路由)和body(字节数组, 事件的实际内容)组成(只支持如下格式:[{header:{"key": "value"}, body:"context"},])。事件由client产生, 经由client → agent{source(netcat, http...) → channel(memory, jdbc, file...) → sink(hdfs, hbase, null...)} → destination的路线到达目的地, 在送达sink前需要在channel缓存event, 数据达到sink后才会删除channel中的缓存,而在event从源到目的的迁移的抽象成为(flow)

数据源(Source)
flume multi-source diagram

  source从client接收数据, 并把数据以event的格式临时存放在一个或多个channel中, 直到event被sink成功发送到destination后, channel才将临时数据删除。在Spooling Directory Source中, 被发送过的数据被标记.COMPLETED(后缀名, 可自定义). source支持的类型如下:

类型 说明 实现类
Avro 支持 Avro协议(实际上是Avro RPC), AvroLegacySource(0.9.x OG版本兼容), 内置支持 AvroSource, AvroLegacySource
Thrift 支持Thrift 协议, ThriftLegacySource(0.9.x OG版本兼容), 内置支持 ThriftLegacySource
Exec 基于Unix Command在标准输出上生产数据 ExecSource
JMS 从JMS系统(消息, 主题)中读取数据, ActiveMQ已测
Spooling Directory 监控指定目录内数据变更
Netcat 监听端口(官网上只写了tcp type), 将流经端口的每一个文本行数据作为event输入 NetcatSource
Twitter 1% firehose 通过API持续下载Twitter数据(experimental)
Sequence Generator 序列生成器数据源, 生产序列数据 SequenceGeneratorSource
Syslog 读取syslog数据产生Event, 支持UDP和TCP两种协议 SyslogTcpSource, SyslogUDPSource
HTTP 基于HTTP POST或GET方式的数据源, 支持 JSON和BLOB 表示形式
ScribeSource ScribeSource
StressSource 主要用于测试, 由连续的具有相同payload的Event构成, 不用于生产环境 StressSource
(custom type as FQCN) 自定义Source (custom FQCN)
管道(Channel)

  Channel是一个临时的存储容器, 它把从source接收到的数据以event格式存储起来, 直到event被sink消费后才把临时数据删除。Channel可和多个source和sink连接, 成为source和sink的桥梁, 提供数据收发的一致性保证。常见类型为Memory, JDBC和File. Memory提供高速吞吐但无法保证数据的完整性, File通过持久化数据至磁盘保证数据的完整性。

类型 说明 实现类
Memory 存储在内存中, 高速但非持久化的管道 MemoryChannel
File 可读写、映射和操作文件的管道, 存储Event在磁盘中 FileChannel
JDBC 基于JDBC的可持久化的管道, 内置支持Derby JDBCChannel
Recoverable Memory 基于本地文件存储系统提供持久化的管道(据说建议用FileChannel代替) RecoverableMemoryChannel
Spillable Memory 存储在内存和磁盘中, 内存队列满则持久化到磁盘上(Experimental)
PseudoTxnMemory 测试用Channel, 不用于生产环境 PseudoTxnMemoryChannel
(custom type as FQCN) 自定义Channel (custom FQCN)
水槽(Sink)
flume multi-sink diagram

  Sink从channel中读取并删除event, 将event传递到Hbase、HDFS或flow pipeline的下一个agent等。Sink类型如下:

类型 说明 实现类
HDFS 写入所有Event到HDFS, 支持Rolling, Bucketing, HDFS-200 Append等 HDFSEventSink
Hbase 从Channel读取Event并写入Hbase(从Async的实现类来看似乎可以异步) HBaseSink, AsyncHBaseSink
Logger 基于配置好的日志子系统以info日志级别记录events(默认log4j) LoggerSink
Avro 对所有Event使用预置的Avro协议发送到配置的RPC端口,当同配置Avro Source时, 形成分层的集合 AvroSink
File Roll 存储在内存和磁盘中, 内存队列满则持久化到磁盘上(Experimental) RollingFileSink 
IRC event在IRC回放 IRCSink
Null 写入/dev/null, 即丢弃所有Event NullSink
Morphline Solr 写入Solr搜索服务器(Clustered)
ElasticSearch 写入Elastic Search搜索服务器(Clustered) ElasticSearchSink
Thrift 写入Thrift ThriftSink
Kite Dataset 写入到Kite Dataset(Experimental)
(custom type as FQCN) 自定义Sink (custom FQCN)
拦截器(Interceptor)
flume architecture
来源于 http://blog.csdn.net/ty_laurel/article/details/54585726

  拦截器用于source与channel之间对event数据进行处理, 主要对event的header进行CUD, 拦截器有如下类型:

类型 说明 实现类
Host 加入Agent主机名或IP地址 HostInterceptor$Builder
Timestamp 加入当前时间戳(ms) TimestampInterceptor$Builder
Static 加入一组静态的键值对(key-value) StaticInterceptor$Builder
Regex Filter 对Event Body中做正则提取出需要的匹配项 RegexFilteringInterceptor$Builder
UUID 加入UUID
Morphline Cloudera开源ETL(ElasticSearch, Logstash, Kibana)框架(目前由Kite主导开发), 富配置转化链, Grok正则解析
(custom type as FQCN) 自定义Interceptor (custom FQCN)

  关于Morphline框架在Flume中的作用, 由以下两个图展示:

morphline
来源于 https://allthingshadoop.com/2014/05
flume and morphline architecture
来源于 http://vinoyang.com/2015/11/20/build-log-sys-with-flume-and-morphline
管道选择器(Channel Selector)
类型 说明 实现类
Replicating 把从Source接收到的所有Event发往所有的Channel(default) ReplicatingChannelSelector
multiplexing 根据Event Header中的Key选择管道 MultiplexingChannelSelector
(custom type as FQCN) 自定义Channel Selector (custom FQCN)
水槽处理器(Sink Processor)

  Sink Processtor可用于激活sinks中特定sink用于负载均衡(load balance)

类型 说明 实现类
Default 单独的Sink DefaultSinkProcessor
Failover 维护Sinks优先级列表, 保证只要有一个Sink即可处理Event(故障转移)(若没有设置优先级则按声明顺序) FailoverSinkProcessor
Load Balance 维护Active Sinks索引列表, 选择机制有Round Robin(default)和Random, 也可以自定义继承AbstractSinkSelector LoadBalancingSinkProcessor
(custom type as FQCN) 自定义Sink Processor (custom FQCN)
事件序列器(Event Serializer)

  Event需要对其Body进行序列化, 主要有以下类型: EventSerializer$Builder(Text, Avro Event), EventSerializer(SimpleHbaseEventSerializer, SimpleAsyncHbaseEventSerializer, RegexHbaseEventSerializer), HbaseEventSerializer(HbaseSink的自定义序列化), AsyncHbaseEventSerializer(AsyncHbaseSink的自定义序列化), EventSerializer$Builder(除了HbaseSink和AsyncHbaseSink外其他Sink的自定义序列化)。

HDFS

hadoop logo

  Hadoop分布式文件系统(distributed fileSystem)为Hadoop(java, 前yahoo主导)核心子项目, 基于流式数据访问模式处理超大文件需求开发, 适用于一次写入多次读取的应用场景, 不适合低延迟数据访问需求, 小文件存储, 并发写入(不允许多线程同时写入同一文件)和随机修改(仅支持数据追加(append))。主要有Client(调用HDFS API, 从NameNode获取文件元数据与DataNode交互数据读写), NameNode(元数据节点, 管理元数据, 分配数据存储节点), DataNode(数据存储节点, 负责数据存储读写与冗余备份)。此项目主要采用Spark计算和Hive, Impala的查询, MapReduce使用不明显, 暂时不写MapReduce, ResourceManager(Yarn), TaskTracker以及JobTrackter计算任务框架。

  • RPC通信
  • 可构建在廉价机器上
  • 提供统一抽象目录树
  • 高容错(默认3副本, 由机架感知(rack-aware)副本在DN本地存一份, 同机架存一份, 不同机架存一份, 每个DN最多存一个副本, 每个机架最多存两个副本)
  • 2.x版本后提供高可用性, 对NN配置secondary NN, 根据checkpoint机制合并NN上的editslog(记录元数据操作)到fsimage(磁盘元数据镜像, NN宕机时快速恢复元数据用)
  • 分块(block)存储(2.x版本默认128M, 此前为64M), 有DN承担, DN发送心跳告知NN存活(alive)状态, 也汇报自身block信息(自身数据校验不通过block的不汇报)
  • NN启动先进入安全模式, 当NN确认一定百分比的block安全(block副本数达到最小值为副本安全)的30s后, NN退出安全模式可进行block备份操作
  • 数据访问模式: client(可配置数据副本数与block大小) → NN(检查, 取数据元数据和block分布(由距离client远近排序), 返回FSDataOutputStream, FSDataInputStream 对象), client(通过NN返回的对象与DN交互) → DN(读写数据)
hdfs architecture

Hive

hive logo

  Hive(java, Facebook)是构建在Hadoop上的数据仓库框架, 使用HQL(类SQL语法(不支持主键外键, 可创建索引), 转化SQL为分布式作业, 支持MapReduce(最稳), Spark或Tez)对数据执行CRUD, 提供流式API, 适用于传统数据仓库业务, 不适用于低延迟的交互访问。Hive几乎是Hadoop上的SQL标准, 适合离线ETL与大数据离线Ad-hoc查询, 以及特大规模数据集合精准结果的查询。对于需要交互式的Ad-hoc查询方案, 通常选择Impala, Presto等。

hive architecture
组件(Compenent)
hive architecture 2
  • UI: 用户向系统提交查询和其他操作的接口, 目前有Cli, Hive Web Server提供
  • Driver: 处理查询, 实现会话交互, 提供基于JDBC/ODBC接口上的执行与获取的API
  • MetaStore: 存储hive元数据(表格(table)元数据, 分区(partition)元数据, (bucket)元数据...)到关系型数据库(支持MySQL, Derby...)
  • Compiler: 基于语义分析(semantic analyzer)解析查询块和查询语句, 最终通过metastore中查询表和分区的元数据, 生成可供MapReduce, Spark或Tez执行的执行计划。HQL ———→(Antlr) 抽象语法树(AST) ——————→(semantic analyzer) 最简查询块(QB)(from子句递归生成一个QB) ——————→(logical plan) OP Tree(有向无环图) ——————→(logical optimizer) OP Tree(谓词下推(predicate push down), 分区剪裁(partition prunner), 关联排序(join reorder)) ——————→(physical plan) Task Tree(遇到分发则切一刀生成MapReduce作业, 如group by, join, distributed by, distinct...etc, 分出多个子图, 每个子图构成一个节点, 节点连成执行图) ——————→(physical optimizer) Task Tree(基于输入选择路径, 增加备份作业etc)(该过程可通过explain + statement查看)
  • ExecutionEngine: 通过管理不同阶段性计划中的依赖, 在对应的系统中(HDFS)执行Compiler生成的基于阶段性DAG的执行计划, 新版本有多种实现(MapReduce, Spark或Tez)
hive explain
来源于 http://www.codedata.cn/hacknews/146917894602293594
数据模型(Data Model)
  • Table: 类似传统关系型数据库的表, 能进行过滤, 映射, Join和Union. Table中的数据全部存储在HDFS的一个目录下。支持(external)(table), 即table可由HDFS中已存在的文件或目录创建。Column, Row等概念与传统关系型数据库类似
  • Partition: 分区基于HDFS子目录实现, 每个Table可根据partition key决定数据的存储方式。分区是表中部分列的集合。分区可以减少每次遍历的数据量改善性能。但HDFS不支持大量的子目录, 所以需要考虑分区数量避免过多子目录创建。有静态分区和动态分区: 静态分区在创建表时使用partitioned by定义(创建时需要指定column和数据类型, 加载数据(load data)时需要指定具体为该column的哪个值); 动态分区由设置set hive.exec.dynamic.partition=true;加载数据时不需要指定column的值, hive.exec.dynamic.partition.mode默认值为strick, 即不允许所有分区都是动态的, 如果需要父目录及其子目录全部为动态分区, 需要指定该参数为nostrick
  • Bucket(or Cluster): 每个分区可进一步被分桶, 分桶通过对指定column进行哈希实现(hash结果对桶数求余, 保证每个bucket都有数据), 在数据量足够大时, 分桶比分区有更高的查询效率。需要设置hive.enforce.bucketiong=true;使用关键字clustered by指定column和分桶数
hive data model
来源于 http://www.codedata.cn/hacknews/146917894602293594
数据类型(Data Type)
类型 支持 说明
TinyInt 1-byte signed integer, -128 ~ 127 后缀Y
SmallInt 2-byte signed integer, -32,768 ~ 32,767 后缀S
Int/Integer(默认) 4-byte signed integer, -2147483648 ~ 2147483647
BigInt 8-byte signed integer, -9223372036854775808 ~ 9223372036854775807 后缀L, 大于BIGINT的值需要使用BD后缀和Decimal(38,0)处理
Float 4-byte single precision floating point number
Double 8-byte double precision floating point number
Decimal 在Hive 0.11.0介绍 (HIVE-2693), 在Hive 0.13.0改进 (HIVE-3976)
TimeStamp Hive 0.8.0及其以上支持 支持传统的 UNIX 时间戳和可选的纳秒精度
Ddate Hive 0.12.0及其以上支持
Interval Hive 1.2.0及其以上支持
String 可用单双引号表示, 以C风格转义
VarChar 1 ~ 65535, 超过varchar(length)中length的长度的字串将被截断, 尾部空格影响比较结果
Char 1~255, 固定长度, 字串比指定长度短的部分由空格代替, 尾部空格不影响比较结果
Boolean true or false
Binary Hive 0.8.0及其以上支持
Array 由相同类型元素组成, 下标从0开始
Map 键值对(key-value)
Struct 可包含不同类型元素, dot(.)符号访问语法
Union Hive 0.7.0及其以上支持, 可综合以上数据类型 据说还没有完全支持,官方建议只用于查看

  复杂类型的使用方法:

create table employees (
    name string,
    salary float,
    subordinates array<string>,
    deductions map<string, float>,
    address struct<street:string, city:string, state:string, zip:int>
) partitioned by (country string, state string);
优化(Optimization)

  Hive中最重要的部分是Group By和Join, MapReduce就是Group By或者Join的过程。

  • Group By: 可在执行SQL前设置hive.map.aggr = true, 使Map端发往Reduce端前部分聚合减少数据量; 也可以设置hive.groupby.skewindata = true把一个MR job转化为两个, 第一道MR job输出结果随机分发Reduce端, 每个Reduce端按Group By的key部分聚合, 第二道MR job根据预处理结果按Group By的key分布到Reduce端完成最终聚合。二者原理都是通过部分聚合减少Reduce端的数据量, 而聚合的有效性和效率通常与UDAF(user-defined aggregation funcation)有关, 且只对代数聚合函数(count, sum...)有效, 对整体聚合函数(avg, mean...)无效
  • Join: 应该尽可能把分布均匀的表放在左边, 把倾斜表放在右边; 通过使用MAPJOIN(<小表名>)把小表打成一个哈希表序列化文件的压缩包, 通过分布式缓存均匀分发到各个job执行节点上, 然后在节点解压在内存中完成关联(此处没有用到Reduce, 也不存在数据倾斜), 默认情况下小表 $\le$ 25M, MAPJOIN通常不会手动设置, 而是通过Hive中的physical optimizer把join优化为auto map join, 通过给Task设置一个conditional task把任务划分为$\le$ 25M和$\gt$ 25M, 新版Hive中默认hive.auto.convert.join = true

Impala

impala logo

  Impala(c++)在Cloudera受Google dremel启发下开发的实时SQL查询引擎(分布式大规模并行处理(MPP)数据库引擎), 功能上与shark(依赖Hive)和Drill(apache)类似。它脱离了Hive中MR批处理的缓慢, 采用与商用并行关系数据库类似的由planner, coordinator, exec engine组成的分布式查询引擎降低延迟。

impala architecture

Impala特性:

  • 基于内存, 对内存依赖大
  • 不再转化为MR
  • C++编写, 运行时代码生成LLVM IR(low level virtual machine intermediate representation)提高效率
  • 完全依赖于Hive, 直接使用Hive的元数据, 兼容HQL, 具有数据仓库特性, 可对Hive数据进行查询, 稳定性低于Hive
  • 支持data local, 列式存储, JDBC/ODBC查询以及SQL92标准, 拥有自己的解析器和优化器
  • Parquet列式存储
Impala Daemon

  Impala Daemon是一个在集群的每个DN上运行的守护进程(impalad), 主要接收client, hue, jdbc/odbc请求, 执行query并返回给协调节点, 负责与statestore通信汇报节点状态。在2.9+版本中, 可以分配集群中coordinator和exec engine角色给不同主机从而提高高并发负载的可伸缩性。impalad包含了planner, coordinator, exec engine, 一般impalad与DN在同一节点(data local)。当某个分发执行的impalad失败时, 整个计划任务都返回失败, 不过再次提交一次查询也没有多少消耗。

  • Query Planner: 解析SQL为执行计划
  • Query Coordinator: 指定查询的主节点(某个impalad), 通知其他节点主节点的信息, 主节点待其他节点查询结果返回给它后, 主节点再返回给中心协调节点
  • Query Exec Engine: 做查询工作

  Impalad的查询执行分为frontend(java, 以JNI嵌入impalad, 生成查询计划)和backend(c++, 执行查询)。Frontend先生成单机查询计划(与关系型数据库执行计划相同, 查询优化方法类似), 后生成分布式查询计划(减少数据移动, 数据与计算尽量放一起)。

impala query

  上图为三张表join后做聚合再排序取topN的例子。Impala查询优化器支持代价模型(以表和分区为基数, 每列distinct值个数做统计估算执行计划代价, 从而生成较优执行计划)。可以看到上图左侧则为frontend生成的单击查询计划, 进而转换为6个segment(彩色无边框圆角矩形背景, 每个segment可由单个主机独立执行, 为计划子树)的分布式查询计划。Impala支持表广播(把join中一个表广播到相关节点, 如图t3)和哈希重分布(根据join字段哈希值重新分布两张表数据, 如图t1, t2)两种分布式join方式。分布式查询计划中的聚合函数先对本地数据进行分组聚合(如图Pre-Agg)降低数据量和数据重分布, 再把上一步的结果在汇总聚合(如图MergeAgg)计算出最终结果, topN计划过程同理。BackEnd从frontend接收plan segment执行, 执行性能优化方面, 有向量执行(getNext处理一批数据, 多个操作符做pipeline), LLVM IR, IO本地化以及parquet列式存储。

Impala Statestore

  Impala Statestore负责收集各个impalad进程的资源信息, 各节点健康情况以及同步节点信息。只需要在一个主机上运行这样的一个进程即可(statestored), 如果某个impalad因为硬件故障, 网络错误, 软件问题或其他原因脱机, statestore会通知其他impalad避免向不可访问节点发出查询请求。statestore在impala集群中并不是一个关键进程, 如果statestore未运行或无法访问, impalad照常继续运行并分配工作, 相对的集群会缺少健壮性。

Impala Catalog

  Impala Catalog(Impala 1.2+)把Impala表的元数据分法到各个impalad中, 实现DDL。当impalad节点插入或查询时, impalad把自己的操作结果通知statestore, 之后statestore通知catalogd更新元数据信息。所以一般把catalogd和statestored放在同一个主机上运行, 且该主机不再运行impalad提供查询服务, 避免集群管理出现问题。

Impala文件格式
类型 格式 压缩 Create Insert
Parquet Structured Snappy
GZIP
Text Unstructured LZO
如果建表时没有指定存储类型,默认采用未压缩的 text,字段由 ASCII 编码的 0x01 字符串分割

如果使用了 LZO 压缩,则只能通过 Hive 建表和插入数据
Avro Structured Snappy
GZIP
Deflate
BZIP2

Impala 1.4.0+支持,之前的版本只能通过 Hive 来建表。

只能通过 LOAD DATA 的方式将已经转换好格式的数据加载进去,或者使用 Hive 来插入数据
RCFile Structured Snappy
GZIP
Deflate
BZIP2

只能通过 LOAD DATA 的方式将已经转换好格式的数据加载进去,或者使用 Hive 来插入数据
SequenceFile Structured Snappy
GZIP
Deflate
BZIP2

只能通过 LOAD DATA 的方式将已经转换好格式的数据加载进去,或者使用 Hive 来插入数据
Impala压缩编码
类型 说明
Snappy 推荐编码, 压缩率和压缩速度有很好的平衡性, snappy压缩速度很快, 但比gzip压缩率低, Impala不支持snappy压缩的text file
GZIP 压缩率高, Impala不支持gzip格式的text file
Deflate Impala不支持deflate压缩的text file
BZIP2 Impala不支持bzip2压缩的text file
LZO 只用于text file, Impala可查询lzo压缩的text格式数据表但不支持insert, 只能通过Hive完成insert

Spark

机器学习(Machine Learning)

Scala

操作分析(Operational Analytics)

开放数据模型(Open Data Model)

用例(UseCase)

分析(Analysis)


模块(Module)

Spot-Ingest

spot-ingest framework

  Spot-Ingest基于分布式架构, 使用spot-collector守护进程针对不同采集源产生数据时, 通过kafka, spark streaming, hive和hdfs提供近99.99999%的数据完整性。

Spot-Collector

  Spot-Collector基于文件系统在后台监视文件系统的新文件。当网络工具产生新文件或较早产生的数据留在监视路径上时, collectors使用解析工具(比如nfdump和tshark)转化为可读格式, 并把其原始格式存储到Hadoop用于取证, 以avro-parquet格式存储在Hive中以便做SQL查询。$\gt$ 1MB的文件提供其文件名和hdfs路径给kafka, $\lt$ 1MB的文件提供其data event给kafka并在之后由spark streaming做进一步处理(在其github上描述是proxy的pipeline才用spark streaming处理, 不知道一般proxy数据是否是$\lt$ 1MB的)。

Spot-Ingest Kafka
spot-ingest framework

  从图上看是生成flow, dns和proxy三个主题, 各主题分区数由spot-worker的数量决定, spot-collectors作为提供者传输数据到kafka存储, spot-worker作为消费者消费kafka中三个主题的数据。

Spot-Worker

  Spot-Worker作为后台守护进程订阅指定的kafka主题和分区, 并在特定的Hive表中读取解析存储数据, 该数据在将来由ML算法消费。当前有两种worker, 通过定义的解析器多线程处理数据的python worker和使用spark-streaming context(micro batching)执行spark应用处理来自kafka数据的spark-streaming worker.

Spot-ML

spot-ml

  Spot-ML包含执行可疑连接的例程, 分析采集自网络中的netflow, dns或proxy日志。通过分析一系列网络事件, 生成一个最不可能和最可疑的事件列表, 该过程依赖于spot-ingest来加载netflow, dns和proxy记录。它使用主题建模(topic model)来发现正常和异常行为, 把IP关联的日志集合作为文档, 并使用Latent Dirichlet Allocation(LDA)来发现这些文档集合中隐藏的语义结构。Spot-ML为每个IP地址的网络行为提供概率模型(probabilistic model), 即赋予每个网络日志条目被该模型一个估算的概率(或得分), 得分较低的事件被标记为可疑以便进一步分析。
  LDA基于三层的贝叶斯模型, 是一个用于离散(discrete)数据的生成概率模型(generative probabilistic model), 例如文本全集。在这个模型中文档的每个单词(word)都是由一组基础主题混合生成的。LDA在网络流量中, 通过聚合和离散将网络日志条目转换为单词。这样, 文档对应IP地址, 日志条目的单词(与一个IP地址相关)和主题对应公共网络活动的概要文档。

Spot-OA

Spot-Setup

环境(Environment)

environment

演示(Demo)

参考(Reference)


  [01]  kafka 基础知识梳理, Go_小易, 2017-08-14 17:57
  [02]  kafka 学习笔记:知识点整理, cyfonly, 2016-10-12 22:13
  [03]  kafka 数据可靠性深度解读, 朱小厮, 2017-05-02 19:19
  [04]  Kafka 消费组 (consumer group), heidsoft, 2017-10-20 09:53
  [05]  Flume NG 基本架构及原理, dantezhao, 2016-09-14 21:52
  [06]  Flume 架构以及应用介绍, 安静的技术控, 2016-05-31 12:35
  [07]  日志系统之 Flume 采集加 morphline 解析, yanghua, 2015-11-20
  [08]  flume 拦截器及问题解决, ty_laurel, 2017-01-17 18:50
  [09]  【漫画解读】HDFS 存储原理, 雪飘飘, 2016-02-22 13:50
  [10]  深刻理解 HDFS 工作机制, Pickle, 2017-01-11 08:59
  [11]  Apache Hive Design, Administrator, 2015-11-08
  [12]  杨卓荦:Hive 原理及查询优化, anand, 2016-07-07
  [13]  Impala(多图手机用户慎入,理论 + 实践), SET, 2016-09-21 23:33
  [14]  怎么理解 impala(impala 工作原理是什么), 邱明成, 2017-02-12 09:28
  [15]  Apache Spot, 2016-03-29

To be Continued...

Last modification:May 26th, 2018 at 12:24 am
If you think my article is useful to you, please feel free to appreciate

Leave a Comment

captcha