apache|基于 Confluent+Flink 的实时数据分析最佳实践( 二 )


' 'properties.group.id' = 'gamegroup' 'format' = 'json' 'properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks' 'properties.ssl.truststore.password' = '[your truststore password
' 'properties.security.protocol'='SASL_SSL' 'properties.sasl.mechanism'='PLAIN' 'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx[集群的用户
\" password=\"xxx[相应的密码
\";');-- 创建累计消费hologres sink表CREATE TEMPORARY TABLE consume( appkey STRING serverid STRING servertime STRING roleid STRING amount DOUBLE dt STRING PRIMARY KEY (appkeydt) NOT ENFORCED )WITH ( 'connector' = 'hologres' 'dbname' = 'mydb' 'endpoint' = 'hgprecn-cn-tl32gkaet006-cn-beijing-vpc.hologres.aliyuncs.com:80' 'password' = '[your appkey secret
' 'tablename' = 'consume' 'username' = '[your app key
' 'mutateType' = 'insertorreplace' );--{\"appkey\":\"appkey1\"\"servertime\":\"2020-09-30 14:10:36\"\"consumenum\":33.8\"roleid\":\"roleid1\"\"serverid\":\"1\"--{\"appkey\":\"appkey2\"\"servertime\":\"2020-09-30 14:11:36\"\"consumenum\":30.8\"roleid\":\"roleid2\"\"serverid\":\"2\"--{\"appkey\":\"appkey1\"\"servertime\":\"2020-09-30 14:13:36\"\"consumenum\":31.8\"roleid\":\"roleid1\"\"serverid\":\"1\"--{\"appkey\":\"appkey2\"\"servertime\":\"2020-09-30 14:20:36\"\"consumenum\":33.8\"roleid\":\"roleid2\"\"serverid\":\"2\"--{\"appkey\":\"appkey1\"\"servertime\":\"2020-09-30 14:30:36\"\"consumenum\":73.8\"roleid\":\"roleid1\"\"serverid\":\"1\" -- 计算每个用户累积消费金额 insert into consume SELECT appkeyLAST_VALUE(serverid) as serveridLAST_VALUE(servertime) as servertimeLAST_VALUE(roleid) as roleid sum(consumenum) as amount substring(servertime110) as dt FROM kafka_game_consume_source GROUP BY appkeysubstring(servertime110) having sum(consumenum)0; 在高级配置里 , 增加依赖文件truststore.jks(访问内部域名得添加这个文件 , 访问公网域名可以不用) , 访问依赖文件的固定路径前缀都是/flink/usrlib/(这里就是/flink/usrlib/truststore.jks) 点击上线按钮 , 完成上线 在运维作用列表里找到刚上线的作用 , 点击启动按钮 , 等待状态更新为running , 运行成功 。在control center的【Topics-Messages】页面 , 逐条发送测试消息 , 格式为: {\"appkey\":\"appkey1\"\"servertime\":\"2020-09-30 14:10:36\"\"consumenum\":33.8\"roleid\":\"roleid1\"\"serverid\":\"1\"{\"appkey\":\"appkey2\"\"servertime\":\"2020-09-30 14:11:36\"\"consumenum\":30.8\"roleid\":\"roleid2\"\"serverid\":\"2\"{\"appkey\":\"appkey1\"\"servertime\":\"2020-09-30 14:13:36\"\"consumenum\":31.8\"roleid\":\"roleid1\"\"serverid\":\"1\"{\"appkey\":\"appkey2\"\"servertime\":\"2020-09-30 14:20:36\"\"consumenum\":33.8\"roleid\":\"roleid2\"\"serverid\":\"2\"{\"appkey\":\"appkey1\"\"servertime\":\"2020-09-30 14:30:36\"\"consumenum\":73.8\"roleid\":\"roleid1\"\"serverid\":\"1\" 2.4 查看用户充值金额实时统计效果 三、最佳实践-电商实时PV和UV统计-Confluent+实时计算Flink+RDS 3.1 新建Confluent消息队列 在confluent集群列表页 , 登录control center 在左侧选中Topics , 点击Add a topic按钮 , 创建一个名为pv-uv的topic , 将partition设置为3 3.2 创建云数据库RDS结果表 登录 RDS 管理控制台页面 , 购买RDS 。 确保RDS与Flink全托管集群在相同region , 相同VPC下 添加虚拟交换机网段(vswitch IP段)进入RDS白名单 , 详情参考:设置白名单文档 3.【vswitch IP段】可在 flink的工作空间详情中查询 在【账号管理】页面创建账号【高权限账号】 数据库实例下【数据库管理】新建数据库【conflufent_vvp】 使用系统自带的DMS服务登陆RDS , 登录名和密码输入上面创建的高权限账户 双击【confluent_vvp】数据库 , 打开SQLConsole , 将以下建表语句复制粘贴到 SQLConsole中 , 创建结果表 CREATE TABLE result_cps_total_summary_pvuv_min( summary_date date NOT NULL COMMENT '统计日期' summary_min varchar(255) COMMENT '统计分钟' pv bigint COMMENT 'pv' uv bigint COMMENT 'uv' currenttime timestamp COMMENT '当前时间' primary key(summary_datesummary_min)) 3.3 创建实时计算VVP作业 1.【[VVP控制台】新建文件 在SQL区域输入以下代码: --数据的订单源表CREATE TABLE source_ods_fact_log_track_action ( account_id VARCHAR --用户ID client_ip VARCHAR --客户端IP client_info VARCHAR --设备机型信息 platform VARCHAR --系统版本信息 imei VARCHAR --设备唯一标识 `version` VARCHAR --版本号 `action` VARCHAR --页面跳转描述 gpm VARCHAR --埋点链路 c_time VARCHAR --请求时间 target_type VARCHAR --目标类型 target_id VARCHAR --目标ID udata VARCHAR --扩展信息 , JSON格式 session_id VARCHAR --会话ID product_id_chain VARCHAR --商品ID串 cart_product_id_chain VARCHAR --加购商品ID tag VARCHAR --特殊标记 `position` VARCHAR --位置信息 network VARCHAR --网络使用情况 p_dt VARCHAR --时间分区天 p_platform VARCHAR --系统版本信息) WITH ( 'connector' = 'kafka' 'topic' = 'game_consume_log' 'properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.c79f69095bc5d4d98b01136fe43e31b93:9071' 'properties.group.id' = 'gamegroup' 'format' = 'json' 'properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks' 'properties.ssl.truststore.password' = '【your password】' 'properties.security.protocol'='SASL_SSL' 'properties.sasl.mechanism'='PLAIN' 'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"【your user name】\" password=\"【your password】\";');--{\"account_id\":\"id1\"\"client_ip\":\"172.11.1.1\"\"client_info\":\"mi10\"\"p_dt\":\"2021-12-01\"\"c_time\":\"2021-12-01 19:10:00\"CREATE TABLE result_cps_total_summary_pvuv_min ( summary_date date --统计日期 summary_min varchar --统计分钟 pv bigint --点击量 uv bigint --一天内同个访客多次访问仅计算一个UV currenttime timestamp --当前时间 primary key (summary_date summary_min)) WITH ( type = 'rds' url = 'url = 'jdbc:mysql://rm-【your rds clusterId】.mysql.rds.aliyuncs.com:3306/confluent_vvp'' tableName = 'result_cps_total_summary_pvuv_min' userName = 'flink_confluent_vip' password = '【your rds password】');CREATE VIEW result_cps_total_summary_pvuv_min_01 ASselect cast (p_dt as date) as summary_date --时间分区count (client_ip) as pv --客户端的IPcount (distinct client_ip) as uv --客户端去重cast (max (c_time) as TIMESTAMP) as c_time --请求的时间from source_ods_fact_log_track_actiongroup by p_dt;INSERT into result_cps_total_summary_pvuv_minselect a.summary_date --时间分区 cast (DATE_FORMAT (c_time 'HH:mm') as varchar) as summary_min --取出小时分钟级别的时间 a.pv a.uv CURRENT_TIMESTAMP as currenttime --当前时间from result_cps_total_summary_pvuv_min_01 AS a; 点击【上线】之后 , 在作业运维页面点击启动按钮 , 直到状态更新为RUNNING状态 。在control center的【Topics-Messages】页面 , 逐条发送测试消息 , 格式为: {\"account_id\":\"id1\"\"client_ip\":\"72.11.1.111\"\"client_info\":\"mi10\"\"p_dt\":\"2021-12-01\"\"c_time\":\"2021-12-01 19:11:00\"{\"account_id\":\"id2\"\"client_ip\":\"72.11.1.112\"\"client_info\":\"mi10\"\"p_dt\":\"2021-12-01\"\"c_time\":\"2021-12-01 19:12:00\"{\"account_id\":\"id3\"\"client_ip\":\"72.11.1.113\"\"client_info\":\"mi10\"\"p_dt\":\"2021-12-01\"\"c_time\":\"2021-12-01 19:13:00\" 3.4 查看PV和UV效果 可以看出rds数据表的pv和uv会随着发送的消息数据 , 动态的变化 , 同时还可以通过【数据可视化】来查看相应的图表信息 。pv图表展示: uv图表展示: 原文链接:http://click.aliyun.com/m/1000332182/ 本文为阿里云原创内容 , 未经允许不得转载 。