0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

共享内存的原理和广告埋点数据采集实战分析

454398 来源:Chinaunix 作者:cluo 2020-09-30 14:28 次阅读

一、前言

共享内存广泛用于Redis,Kafka,RabbitMQ 等高性能组件中,本文主要提供一个共享内存在广告埋点数据采集的实战场景。

二、共享内存原理

1、原理

Linux中,每个进程都有属于自己的进程控制块(PCB)和地址空间(Addr Space),并且都有一个与之对应的页表,负责将进程的虚拟地址与物理地址进行映射,通过内存管理单元(MMU)进行管理。两个不同的虚拟地址通过页表映射到物理空间的同一区域,它们所指向的这块区域即共享内存。

当两个进程通过页表将虚拟地址映射到物理地址时,在物理地址中有一块共同的内存区,即共享内存,这块内存可以被两个进程同时看到。这样当一个进程进行写操作,另一个进程读操作就可以实现进程间通信。但是,我们要确保一个进程在写的时候不能被读,因此我们使用信号量来实现同步与互斥。

对于一个共享内存,实现采用的是引用计数的原理,当进程脱离共享存储区后,计数器减一,挂架成功时,计数器加一,只有当计数器变为零时,才能被删除。当进程终止时,它所附加的共享存储区都会自动脱离。

2、与传统文件对比

共享内存可以说是最有用的进程间通信方式,也是最快的IPC形式, 因为进程可以直接读写内存,而不需要任何 数据的拷贝。对于像管道和消息队列等通信方式,则需要在内核和用户空间进行四次的数据拷贝 共享内存则只拷贝两次数据: 一次从输入文件到共享内存区,另一次从共享内存区到输出文件。

实际上,进程之间在共享内 存时,并不总是读写少量数据后就解除映射,有新的通信时,再重新建立共享内存区域。而是保持共享区域,直 到通信完毕为止,这样,数据内容一直保存在共享内存中,并没有写回文件。共享内存中的内容往往是在解除映 射时才写回文件的。因此,采用共享内存的通信方式效率是非常高的。

传统文件

UNIX 访问文件的传统方法是用 open 打开它们,如果有多个进程访问同一个文件,则每一个进程在自己的地址空间都包含有该文件的副本,这不必要地浪费了存储空间。

下图说明了两个进程同时读一个文件的同一页的情形。系统要将该页从磁盘读到高速缓冲区中,每个进程再执行一个存储器内的复制操作将数据从高速缓冲区读到自己的地址空间。

共享存储映射

现在考虑另一种处理方法:进程 A 和进程 B 都将该页映射到自己的地址空间,当进程 A 第一次访问该页中的数据时, 它生成一个缺页中断。内核此时读入这一页到内存并更新页表使之指向它。以后,当进程B访问同一页面而出现缺页中断时,该页已经在内存,内核只需要将进程 B 的页表登记项指向次页即可。

3、mmap()

(1)mmap()系统调用

mmap()系统调用使得进程之间通过映射同一个普通文件实现共享内存。普通文件被映射到进程地址空间后,进程可以向访问普通内存一样对文件进行访问,不必再调用read(),write()等操作。

mmap()系统调用形式如下:

1void* mmap ( void * addr , size_t len , int prot , int flags , int fd , off_t offset )

mmap的作用是映射文件描述符fd指定文件的 [off,off + len]区域至调用进程的[addr, addr + len]的内存区域:

数fd为即将映射到进程空间的文件描述字,一般由open()返回,同时,fd可以指定为-1,此时须指定flags参数中的,MAP_ANON,表明进行的是匿名映射(不涉及具体的文件名,避免了文件的创建及打开,很显然只能用于具有亲缘关系的进程间通信)。

len是映射到调用进程地址空间的字节数,它从被映射文件开头offset个字节开始算起。

prot 参数指定共享内存的访问权限。可取如下几个值的或:PROT_READ(可读) , PROT_WRITE (可写), PROT_EXEC (可执行), PROT_NONE(不可访问)。

flags由以下几个常值指定:MAP_SHARED , MAP_PRIVATE , MAP_FIXED,其中,MAP_SHARED , MAP_PRIVATE必选其一,而MAP_FIXED则不推荐使用。

offset参数一般设为0,表示从文件头开始映射。

参数addr指定文件应被映射到进程空间的起始地址,一般被指定一个空指针,此时选择起始地址的任务留给内核来完成。函数的返回值为最后文件映射到进程空间的地址,进程可直接操作起始地址为该值的有效地址。

(2)mmap()返回地址的访问

对mmap()返回地址的访问,linux采用的是页式管理机制。

对于用mmap()映射普通文件来说,进程会在自己的地址空间新增一块空间,空间大小由mmap()的len参数指定,注意,进程并不一定能够对全部新增空间都能进行有效访问。

进程能够访问的有效地址大小取决于文件被映射部分的大小。

简单的说,能够容纳文件被映射部分大小的最少页面个数决定了进程从mmap()返回的地址开始,能够有效访问的地址空间大小。

超过这个空间大小,内核会根据超过的严重程度返回发送不同的信号给进程。可用如下图示说明:

三、VCS 共享内存采集实战

VCS(vivo control system): 负责全网所有类型的监控指标采集,为上游运维平台提供底层命令通道能力和全网插件升级管控能力。

1、数据结构

2、分区读写

为了要确保一个进程在写的时候不能被读,我们使用idx来标记可读块。

3、规则,指标和值

下图描述的是从连续内存空间转化成【规则,维度,值】语义的过程:

4、源码分析

5、general.proto

通用监控上报协议:

general.proto

syntax = “proto2”;

package general;

message Data {

map kv = 1;

}

message GeneralData {

optional string rule_id = 1;

repeated Data data = 2;

optional int64 count = 3;

optional int64 left_size = 4;

optional int32 version = 5;

}

6、constant.go 配置参数

| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte预留长度 | magincNum2(4byte) | 4k protect |

package moni_shm

const (

OssShmId uint32 = 0x3eeff00

MagicNum1 uint32 = 0x650a218

MagicNum2 uint32 = 0x138a4f2

CreateShmLock = “/var/run/.oss_shm_lock”

OssMapOneAttrCnt = 1024 * 128 //1024 个规则

OssOneAttrEntryCnt = 128 //每个规则有128个指标

EntrySz = 4

OssMapCnt = 2

OneAttrSz = OssOneAttrEntryCnt * EntrySz

OssMapSz = OssMapOneAttrCnt * OneAttrSz

OssAttrSz = OssMapSz*OssMapCnt + 4 + 4 + 64*4 + 4

defaultIntervalSec = 60

defaultTopic = “moni_general_shared_memory”

7、util.go 工具类

内存清零工具和“整页”分配:

cd package moni_shm

import (

“unsafe”

//取整分配

func align(actual, to uint64) uint64 {

return (actual + to - 1) / to * to

}

//连续空间清0

func zero(ptr uintptr, bts uint64) {

if 0 == bts {

return

}

const sz = 4096

var next uint64

cnt := 0

for ; next+sz 《= bts; { //按页清零

arr := (*[sz]byte)(unsafe.Pointer(ptr))

for i := range *arr {

(*arr)[i] = 0

}

next += sz

ptr += uintptr(sz)

cnt++

}

if next == bts {

return

}

var i uintptr

for i = 0; i 《 uintptr(bts-next); i++ { //剩余空间清零

*(*byte)(unsafe.Pointer(ptr + i)) = 0

}

}

8、mgr.go 采集逻辑

共享内存采集逻辑对应 “规则指标和值”:

var (

_basePtr uintptr = 0

_shmUtil = NewShmUtil(OssShmId, OssAttrSz)

_intervalSec = defaultIntervalSec

_topic = defaultTopic

_on bool = false

func Stat(on bool) {

_on = on

}

func Start() {

go collect() //开始采集

}

func tryInitBaseptr() error {

var err error

if _basePtr == 0 {

_basePtr, err = _shmUtil.GetData() //获取当前共享内存数据块首地址

if nil != err {

logrus.Warnf(“init base ptr failed, retrying: %v”, err)

}

}

return err

}

func collect() {

var (

cost time.Duration

start time.Time

first = true

for {

if !first {

time.Sleep(time.Second*(time.Duration(_intervalSec)) - cost) //周期对齐

}

first = false

start = time.Now()

if !_on {

cost = time.Since(start)

continue

}

if _basePtr == 0 {

if err := tryInitBaseptr(); nil != err {

cost = time.Since(start)

continue

}

}

d := collectOnce()

for _, v := range d {

moni_report.ProductReportData(*v)

}

cost = time.Since(start)

}

}

func collectOnce() []*moni_report.ReportData {

now := time.Now()

var ret []*moni_report.ReportData

data := make(map[uint32]*general.GeneralData)

d := SwitchAndFetch(_basePtr)

logrus.Infof(“sending %d data from shm”, len(d))

for _, v := range d {

ruleId := strconv.FormatUint(uint64(v[0]), 10)

dim := strconv.FormatUint(uint64(v[1]), 10)

value := strconv.FormatUint(uint64(v[2]), 10)

if _, ok := data[v[0]]; !ok {

data[v[0]] = &general.GeneralData{

RuleId: proto.String(ruleId),

Data: []*general.Data{},

}

}

data[v[0]].Data = append(data[v[0]].Data, &general.Data{

Kv: map[string]string{

dim: value,

“timestamp”: strconv.FormatInt(now.Unix()*1000, 10),

“ip”: viper.GetString(“host.inner_ip”),

},

})

}

logrus.Infof(“collect format shm data:%v”, data)

for _, v := range data {

bts, err := proto.Marshal(v)

if nil != err {

logrus.Errorf(“marshal shm data failed: %v”, err)

continue

}

ret = append(ret, &moni_report.ReportData{

DataBytes: bts,

Topic: _topic,

})

}

return ret

}

9、shmutil.go 共享内存操作

每60秒根据idx值切换可读区,采集后上报后,清零,切换到下一区。

package moni_shm

import (

“fmt”

“log”

“os”

“syscall”

“unsafe”

“github.com/sirupsen/logrus”

const (

IpcCreate = 00001000

var (

ErrNotCreated = fmt.Errorf(“shm not created”)

ErrCreateFailed = fmt.Errorf(“shm create failed”)

type shmOpt func(*ShmUtil)

func WithCreate(b bool) shmOpt {

return func(u *ShmUtil) {

u.create = b

}

}

/*共享内存数据结构

|1page mprotect|page align data|1page mprotect|

| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte预留长度 | magincNum2(4byte) | 4k protect |

*/

type ShmUtil struct {

pageSz int

dataSz uint64

total uint64

shmKey uint32

create bool

base uintptr

data uintptr

}

func NewShmUtil(key uint32, sz uint64, cfgs 。。.shmOpt) *ShmUtil {

if key == 0 {

panic(“invalid shm key: 0”)

}

ret := &ShmUtil{

dataSz: sz,

shmKey: key,

}

ret.pageSz = os.Getpagesize() //获取页大小

ret.dataSz = align(ret.dataSz, uint64(ret.pageSz)) //按页分配“包体”大小

ret.total = ret.dataSz + uint64(ret.pageSz)*2 // 总空间大小=包体大小 + 头尾各2页保护地址

for _, c := range cfgs {

c(ret)

}

return ret

}

func (s *ShmUtil) attachShm(flag int) error {

created := false

shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag)) //使用已存在的共享内存,返回共享内存标识符

if 0 != errno {

return errno

}

if shmid 《 0 {

if !s.create { //不允创建,直接返回

return ErrNotCreated

}

shmid, _, errno = syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag|IpcCreate)) //新创建共享内存

if 0 != errno {

return fmt.Errorf(“shm create: %v”, errno)

}

if shmid 《 0 {

return ErrCreateFailed

}

created = true

}

addr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0) //挂接共享内存到当前进程

if 0 != errno {

return fmt.Errorf(“shmat: %v”, errno)

}

if created {

zero(addr, s.total)//新创建的共享内存,初始化共享内存数据

}

s.base = addr //记录共享内存首地址 用于之后的释放

s.data = s.base + uintptr(s.pageSz) //写数据的起始地址

_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.base, uintptr(s.pageSz), 0)

if 0 != errno { //锁定共享内存头,锁指定的内存区间必须包含整个内存页(4K)

s.detach()

return fmt.Errorf(“mprotect head: %v”, errno)

}

_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.data+uintptr(s.dataSz), uintptr(s.pageSz), 0) //锁指定共享内存尾,区间开始的地址start必须是一个内存页的起始地址,并且区间长度len必须是页大小的整数倍。

if 0 != errno {

s.detach()

return fmt.Errorf(“mprotect tail: %v”, errno)

}

return nil

}

func (s *ShmUtil) detach() { //进程去关联共享内存

if 0 != s.base {

syscall.Syscall(syscall.SYS_SHMDT, s.base, 0, 0)

s.base = 0

s.data = 0

}

}

/*

获取内存并且返回数据段起始位置

s.create 决定是否新申请共享内存

*/

func (s *ShmUtil) GetData() (uintptr, error) {

if s.data != 0 {

return s.data, nil

}

if err := s.attachShm(0666); nil != err { //初始化共享内存,并关联到进程

return 0, err

}

return s.data, nil

}

func SwitchAndFetch(ptr uintptr) [][3]uint32 { //从共享内存读取 [][3]uint32{ossid,key,value}

if ptr == 0 {

return nil

}

m1 := (*uint32)(unsafe.Pointer(ptr))

m2 := (*uint32)(unsafe.Pointer(ptr + 8 + OssMapSz*2 + 4*64))

if MagicNum1 != *m1 || MagicNum2 != *m2 {

logrus.Errorf(“magic 1 in header: wrote:%v\tread:%v\n”, MagicNum1, *m1)

logrus.Errorf(“magic 2 in tail: wrote:%v\tread:%v\n”, MagicNum2, *m2)

return nil

}

idx := (*uint32)(unsafe.Pointer(ptr + 4)) //切换块标志

old := *idx

*idx = 1 - *idx

ret := PartialRead(ptr, old) //读取当前idx块数据

zero(ptr+8+uintptr(old)*OssMapSz, OssMapSz) //读完清0

return ret

}

//根据idx轮流读数据区域

func PartialRead(ptr uintptr, idx uint32) [][3]uint32 { //根据idx获取块起始地址

startPtr := ptr + 8 + uintptr(idx)*OssMapSz

ret := ReadOssMap(startPtr)

log.Printf(“result: %v\n”, ret)

return ret

}

func ReadOssMap(ptr uintptr) [][3]uint32 { //1个周期内的指标总容量为 128*1024 = 128k = 13W

var ret [][3]uint32

var i uint32 = 0

for i = 0; i 《 OssMapOneAttrCnt; i++ { //1个周期最多支持1024个业务

for _, v := range ReadOneAttr(ptr) {

ret = append(ret, [3]uint32{i, v[0], v[1]}) // [osID,keyID,value]

}

ptr += OneAttrSz // OneAttrSz = OssOneAttrEntryCnt * EntrySz= 128*4

}

return ret

}

func ReadOneAttr(ptr uintptr) [][2]uint32 {

var ret [][2]uint32

var i uint32 = 0

for i = 0; i 《 OssOneAttrEntryCnt; i++ { //目前默认一个业务下最多有128单维度指标, OssOneAttrEntryCnt = 128

v := *(*uint32)(unsafe.Pointer(ptr))

if v != 0 {

ret = append(ret, [2]uint32{i, v}) // [keyID, value]

}

ptr += EntrySz // 4yte 读取一个指标

}

return ret

}

四、总结

本文通过共享内存的原理和详细分析了一个共享内存在生产上的应用场景,希望能为大家抛砖引玉。
编辑:hfy

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • Linux
    +关注

    关注

    87

    文章

    11322

    浏览量

    209869
  • 内存管理
    +关注

    关注

    0

    文章

    168

    浏览量

    14161
  • 共享内存
    +关注

    关注

    0

    文章

    16

    浏览量

    8325
收藏 人收藏

    评论

    相关推荐

    可与MES系统集成的数据采集监控平台

    和协同。 数据安全与合规: 采取加密技术、访问控制等安全措施,保护数据的机密性和完整性。 遵守相关标准,确保数据的合规性。 数据采集监控平台提高了生产效率,通过实时监控和
    发表于 12-16 15:08

    PLC数据采集模块选型指南

    数据类型 :模拟信号(如温度、压力)或数字信号(如开关状态、计数器)。 数据量 :需要采集数据点数量。 采样率 :数据更新的频率。 精
    的头像 发表于 11-26 11:46 629次阅读

    振弦式土压力计的数据采集方法

    空隙或松动。可以采用预、钻孔等方式进行安装,并使用合适的固定材料将土压力计固定在安装位置上。   连接数据采集设备   将振弦式土压力计与数据采集设备连接起来。常见的数据采集设备有振
    发表于 10-25 14:26

    IOT数据采集平台的功能特点

    的深远影响。 IOT数据采集平台的定义 IOT数据采集平台是一种专门用于物联网数据采集、处理和分析的平台。它通过连接各种工业设备、传感器、仪器仪表、工业机器人等,实现对设备
    的头像 发表于 09-25 13:28 609次阅读

    工控数据采集物联网平台是什么

    工控数据采集物联网平台是一种集成化的软件系统,它主要用于在工业环境中收集、处理、分析和管理来自各种设备和传感器的数据。这种平台结合了物联网(IoT)技术,能够实现对工业设备的远程监控、管理和控制
    的头像 发表于 07-23 15:29 395次阅读

    PLC采集网关如何实现多品牌PLC数据采集

    在工业自动化领域,PLC是不可或缺的核心设备,它负责控制生产线的各个环节,确保生产过程的顺利进行。然而,随着工业自动化水平的不断提高,如何有效地采集、处理和分析PLC数据,成为了企业面临的重要挑战
    的头像 发表于 07-11 17:08 485次阅读

    NI数据采集板卡如何连接使用?

    NI(National Instruments)数据采集板卡是一种常用的工业级数据采集设备,广泛应用于科学研究、工程测试、自动化控制等领域。本文将介绍如何连接产品并使用NI数据采集板卡进行数据
    的头像 发表于 07-11 10:05 1036次阅读

    多通道数据采集仪怎么用的

    连接、软件设置、数据采集数据分析等方面的内容。 一、多通道数据采集仪概述 1.1 多通道数据采集仪的定义 多通道数据采集仪是一种能够同时
    的头像 发表于 07-02 09:08 673次阅读

    plc物联网数据采集平台是什么

    PLC物联网数据采集平台是基于物联网技术,将多个PLC设备连接到云端的数据采集与管理系统。通过采集分析PLC产生的数据,实现对生产过程的实
    的头像 发表于 06-24 15:18 825次阅读

    工控数据采集平台是什么?

    工控数据采集平台是一种用于工业控制和监测领域的系统,该平台能够收集来自传感器、执行器、机械设备以及其他系统的信息,并对这些数据进行处理、存储和分析。这些数据可以是关于生产过程、设备状态
    的头像 发表于 06-14 15:29 431次阅读

    工业数据采集网关功能优势

    随着工业4.0和物联网(IoT)技术的快速发展,工业数据采集网关已成为现代工业生产中不可或缺的一部分。这些网关设备充当了连接现场设备与上层管理系统的桥梁,它们实时收集、传输和分析来自生产线的数据,为
    的头像 发表于 05-11 17:51 868次阅读
    工业<b class='flag-5'>数据采集</b>网关功能优势

    数据采集边缘网关解决企业数据采集痛点的关键

    网关 应运而生,成为解决企业数据采集痛点的关键所在。 一、企业背景与痛点分析 在当前信息化、智能化的时代背景下,许多企业面临着海量数据采集和处理的难题。这些企业通常拥有多个分散的业务场景,如工厂生产线、物流仓库
    的头像 发表于 04-07 13:56 400次阅读

    内存共享原理解析

    内存共享是一种在多个进程之间共享数据的机制,它允许不同的进程直接访问同一块内存区域,从而实现数据
    的头像 发表于 02-19 15:11 1337次阅读
    <b class='flag-5'>内存</b><b class='flag-5'>共享</b>原理解析

    数据采集器是什么设备 数据采集器属于什么设备类型

    数据采集器是一种用于采集和记录数据的设备。它可以连接到各种传感器、仪器或其他数据源,收集数据并将其存储在内部存储器或外部设备中,以供后续
    的头像 发表于 02-04 10:27 4181次阅读

    plc数据采集模块的缺点 plc数据采集模块与数据采集卡的区别

    PLC(可编程逻辑控制器)数据采集模块是用于连接传感器、执行器和机器设备,收集实时数据的设备。虽然PLC数据采集模块在工业自动化领域得到了广泛应用,但它仍然存在一些缺点,而与之相比,数据采集
    的头像 发表于 01-19 14:20 1859次阅读