[B 站公开课] 玩转数仓如此简单

教程 青牛 ⋅ 于 2023-12-19 11:34:46 ⋅ 143 阅读

公开课回放地址:https://www.bilibili.com/video/BV1wG411Y7dd

酷牛商城商品点击率 1.数据源准备

1.1 酷牛商城启动

启动酷牛商城镜像

地址:http://cloud.hainiubl.com/#/privateImageDetail?id=14167&imageType=private

添加共有镜像kettle、hive、zeepline

1.2 检查酷牛商城配置信息

1.2.1 检查nginx配置文件

#将ip改为coolniushop主机名
vim /usr/local/nginx/conf/sites-enabled/hainiushop.com
#启动nginx
/usr/local/nginx/sbin/nginx

file

1.2.2 修改项目配置

修改hainiushop_service项目和hainiushop_sso项目的mysql连接,改为你自己的数据库地址

vim /usr/local/apache-tomcat-service-8.0.20/webapps/hainiushop_service/WEB-INF/proxool.xml
vim /usr/local/apache-tomcat-shop-8.0.20/webapps/hainiushop_sso/WEB-INF/proxool.xml

1.2.3 启动tomcat

/usr/local/apache-tomcat-image-8.0.20/bin/startup.sh
/usr/local/apache-tomcat-service-8.0.20/bin/startup.sh
/usr/local/apache-tomcat-shop-8.0.20/bin/startup.sh

# 关闭tomcat
/usr/local/apache-tomcat-image-8.0.20/bin/shutdown.sh
/usr/local/apache-tomcat-service-8.0.20/bin/shutdown.sh
/usr/local/apache-tomcat-shop-8.0.20/bin/shutdown.sh

1.2.4 配置hosts文件

配置coolniushop 和windowss hosts文件如下:

file

浏览器访问www.hainiushop.com 效果如下:

file

日志产生目录:

file

浏览日志:

寻找第四个字段是以search.action?search.goodsTypeId=101开头的数据

file

点击日志:

file

file

1.3 将日志传输到hdfs目录,方便kettle机器进行下载并ETL处理

点击日志每天需要进行剪切备份,并每天重新生成新的文件用来接收新一天的请求

浏览日志在一个文件中

1.3.1 编写shell 脚本

创建shell脚本存放目录

mkdir -p /data/xinniu/coolniu/access/shell

编写公共配置脚本

vim log_cut_config.sh


#! /bin/bash
#       公共的配置
#access.log 日志位置
ACCESS_LOG_PATH=/data/hainiu_shop_access_log/access_shop.log
ACCESS_BASE_PATH=/data/xinniu/coolniu/access

# 输入点击日志备份目录
ACCESS_LOG_BAK_PATH=${ACCESS_BASE_PATH}/shop_bak
#输入service日志备份目录
# flume监控输入目录的根目录
FLUME_INPUT_BASE_PATH=${ACCESS_BASE_PATH}/work

编写剪切脚本

#! /bin/bash
# 将日志切割

echo "-----------start: "`date +%Y%m%d%H%M%S`" -----------"
echo 'step1:加载配置文件log_cut_config'
#确定当前脚本的位置
cd `dirname $0`
shell_base_path=`pwd`

#加载log_cut_config 文件
. ${shell_base_path}/log_cut_config.sh

echo 'step2:校验log_cut_config 文件的param 是否有空的'
#校验log_cut_config 文件的param 是否有空的,如果有,就终止脚本
#1:无效;0:有效
params_invalid=0

if [ "${ACCESS_LOG_PATH}x" == "x" ]; then
        params_invalid=1
fi

if [ "${ACCESS_LOG_BAK_PATH}x" == "x" ]; then
        params_invalid=1
fi

if [ "${FLUME_INPUT_BASE_PATH}x" == "x" ]; then
        params_invalid=1
fi

#如果有参数没配置,就停止脚本
if [ ${params_invalid} -eq 1 ]; then
        echo "log_cut_config shell config params error"
        exit
fi
echo 'step3:创建需要的目录'
#校验目录存不存在,如果不存在创建
#日志切割工作目录

if [ ! -d ${ACCESS_LOG_BAK_PATH} ]; then
        mkdir -p ${ACCESS_LOG_BAK_PATH}
fi
if [ ! -d ${FLUME_INPUT_BASE_PATH} ]; then
        mkdir -p ${FLUME_INPUT_BASE_PATH}
fi

TIMESTAMP=`date +%Y%m%d%H%M%S`
shop_file_name=shop_access_${TIMESTAMP}.log
echo 'step4:copy 日志数据到备份的目录'
cp ${ACCESS_LOG_PATH}  ${ACCESS_LOG_BAK_PATH}/${shop_file_name}

echo 'step5:mv 日志数据到flume监控的目录(.tmp文件'
mkdir -p ${FLUME_INPUT_BASE_PATH}/shop
mv ${ACCESS_LOG_PATH} ${FLUME_INPUT_BASE_PATH}/shop/access.log.tmp

echo 'step6:mv 日志数据将tmp去掉'
mv ${FLUME_INPUT_BASE_PATH}/shop/access.log.tmp ${FLUME_INPUT_BASE_PATH}/shop/access.log

echo 'step7:kill -USR nginx master进程,让nginx重新生成日志'
PID=`ps -aux | grep nginx | grep master | awk '{print $2}'`
if [ "${PID}x" != "x" ]; then
        kill -USR1 $PID

fi
echo 'step7:删除2天前的备份数据'
#删除2天前的备份文件,  21号的删19号的
delet_date=`date -d 2' day ago' +%Y%m%d`
rm -rf ${ACCESS_LOG_BAK_PATH}/shop_access_${delet_date}*.log

echo "-----------end: "`date +%Y%m%d%H%M%S`" -----------"

1.3.2 安装flume,并配置hadoop环境

将flume安装包解压到到/usr/local目录下

tar -zxvf /public/software/bigdata/apache-flume-1.10.1-bin.tar.gz  -C /usr/local/

file

创建flume的软链接

ln -s /usr/local/apache-flume-1.10.1-bin/ /usr/local/flume

file

修改flume中log4j2.xml配置文件,将LogFile改为Console

file

修改flume运行时占用jvm堆内存大小,修改flume-env.sh.template

mv /usr/local/flume/conf/flume-env.sh.template  /usr/local/flume/conf/flume-env.sh
#添加
export JAVA_OPTS="-Xms512m -Xmx1024m -Dcom.sun.management.jmxremote"

将hive机器中的hadoop安装包远程拷贝到coolniushop机器的/opt目录下

scp -r /opt/jdk1.8.0_144/  coolniushop:/opt/
scp -r /opt/hadoop-2.7.3/ coolniushop:/opt/

配置环境变量:

export JAVA_HOME=/opt/jdk1.8.0_144/
export PATH=$PATH:$JAVA_HOME/bin
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
export HADOOP_HOME=/opt/hadoop-2.7.3
export HADOOP_CONF_HOME=/opt/hadoop-2.7.3/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

#让环境变量生效
source /etc/profile

1.3.3 编写flume agent将日志收集到hdfs

file

点击日志文件配置如下:

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/data/xinniu/coolniu/access/work/shop
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.deletePolicy = immediate
# 跳过.tmp结尾的文件
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/data/xinniu/coolniu/etlbefore/%Y%m%d/LOOK_GOODS_DETAIL_INFO
a1.sinks.k1.hdfs.filePrefix = access_LOOK_GOODS_DETAIL_INFO
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
# 当达到50M,文件关闭,创建新文件写入
a1.sinks.k1.hdfs.rollSize = 52428800
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.callTimeout = 0
a1.sinks.k1.hdfs.idleTimeout = 52428800
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

浏览日志如下:

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=taildir
a1.sources.r1.positionFile = /root/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/hainiu_shop_service_log/service.log
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/data/xinniu/coolniu/etlbefore/%Y%m%d/SHOW_GOODS_BRAND_INFO
a1.sinks.k1.hdfs.filePrefix = access_SHOW_GOODS_BRAND_INFO
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
# 当达到50M,文件关闭,创建新文件写入
a1.sinks.k1.hdfs.rollSize = 52428800
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.callTimeout = 0
a1.sinks.k1.hdfs.idleTimeout = 52428800

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume

#启动两个agent 
flume-ng agent -n a1 -c /usr/local/flume/conf/ -f /data/xinniu/coolniu/access/flume/access_shop_log.agent -Dflume.root.logger=INFO,console

 flume-ng agent -n a1 -c /usr/local/flume/conf/ -f /data/xinniu/coolniu/access/flume/access_shop_service.agent -Dflume.root.logger=INFO,console

hdfs数据收集如下:

file

file

2 kettle服务器数据处理

2.1 安装flume 并配置hadoop

将flume安装包解压到到/usr/local目录下

scp -r  coolniushop:/usr/local/apache-flume-1.10.1-bin /usr/local/

file

创建flume的软链接

ln -s /usr/local/apache-flume-1.10.1-bin/ /usr/local/flume

file

将hive机器中的hadoop安装包远程拷贝到coolniushop机器的/opt目录下

scp -r  coolniushop:/opt/hadoop-2.7.3 /opt/
scp -r  coolniushop:/opt/jdk1.8.0_144 /opt/

配置环境变量:

export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
export HADOOP_HOME=/opt/hadoop-2.7.3
export HADOOP_CONF_HOME=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH:/opt/kettle/data-integration

#让环境变量生效
source /etc/profile

2.2 kettle ETL处理

#首先确定kettle处理的数据输入路径
mkdir -p /data/xinniu/coolniu/access/input/shop
mkdir -p /data/xinniu/coolniu/access/input/service
mkdir -p /data/xinniu/coolniu/access/shell

file

2.2.1 编写脚本从hadoop拉去数据文件

#!/bin/bash
batch_date=`date  +%Y%m%d`
file_log=/data/xinniu/coolniu/access/input/shop/access.log
file_show=/data/xinniu/coolniu/access/input/service/service.log

if [ -f "$file_log" ];then
rm -rf $file_log
echo "$file_log 已删除"
else
echo "$file_log 不存在可以下载数据了"
fi

if [ -f "$file_show" ];then
rm -rf $file_show
echo "$file_show 已删除"
else
echo "$file_show 不存在可以下载数据了"
fi

hadoop fs -get hdfs://ns1/data/xinniu/coolniu/etlbefore/${batch_date}/SHOW_GOODS_BRAND_INFO/access_SHOW_GOODS_BRAND_INFO*    /data/xinniu/coolniu/access/input/service/service.log

hadoop fs -get hdfs://ns1/data/xinniu/coolniu/etlbefore/${batch_date}/LOOK_GOODS_DETAIL_INFO/access_LOOK_GOODS_DETAIL_INFO* /data/xinniu/coolniu/access/input/shop/access.log

2.2.2 kettle ETL 处理流程

点击日志:

file

文本文件输入:

file

file

file

重要字段筛选:

file

增加常量:

file

java代码:

import java.util.Date;
import java.util.Locale;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
//--用来进行日期格式化的 sdf1 日期转化成date类型 sdf2 date类型转化成string类型
SimpleDateFormat sdf1;
SimpleDateFormat sdf2; 
// 不能加泛型 保存areacode文件汇总的地址码
List areaList;
//-用来坐标记
boolean first = true;
// 一行执行一次
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{

    // 拿到一行数据
    Object[] r = getRow();

    // If the row object is null, we are done processing.
    //
    if (r == null) {
      setOutputDone();
      return false;
    }

    if (first) {
      // 读第一行时执行
      sdf1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
      sdf2 = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");

      first=false;
    }

    // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
    // enough to handle any new fields you are creating in this step.
    // 创建要输出的一行
    Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());

    // 获取timestr的值  14/Sep/2020:14:43:38 +0800
    String timestr = get(Fields.In, "req_time").getString(r);

    //--str   14/Sep/2020:14:43:38
    String str = timestr.substring(0, timestr.indexOf(" "));
//          timestr.split(" ")[0]

    // YYYY-MM-dd HH:mm:ss 格式的数据
   //--2023-08-17 12:45:22
    String format = "";
    try {
           Date date = sdf1.parse(str);
           format = sdf2.format(date);
         } catch (ParseException e) {
          // e.printStackTrace();

         }
    // 给 req_time 字段设置值
    get(Fields.Out, "req_time").setValue(outputRow, format);

    String reqstr = get(Fields.In, "req_url").getString(r);
    reqstr = reqstr.substring(0, reqstr.lastIndexOf(" "));
    get(Fields.Out, "req_url").setValue(outputRow, reqstr);  
    //--http://www.hainiushop.com/search.action?search.goodsTypeId=101&typeListStyle=1
    String refer = get(Fields.In, "refer").getString(r);

     if(reqstr.startsWith("GET /lookDetail.action") && refer.equals("http://www.hainiushop.com/search.action?search.goodsTypeId=101&typeListStyle=1")){
        String[] arr = reqstr.split(" ");
        // arr2: [/lookDetail.action?id, 31]
        String[] arr2 = arr[1].split("=|&");
        if(arr2.length >= 2){
           get(Fields.Out, "goods_id").setValue(outputRow, arr2[1]);
        }
    }

    // putRow will send the row on to the default output hop.
    //
    putRow(data.outputRowMeta, outputRow);

    return true;
}

过滤数据:

file

数据导出:

file

file

file

展示日志:

file

文件输入

file

file

file

过滤 服 类点击日志:

file

剪切成json数组:

file

数据输出:

file

file

file

2.2.3 上传ktr文件到服务器的access目录下,通过pan命令执行

file

pan.sh  -file=/data/xinniu/coolniu/access/access.ktr -level=Detailed
pan.sh  -file=/data/xinniu/coolniu/access/access_service.ktr -level=Detailed

观察output目录是否有数据产生:

file

点击日志:

file

展示日志:

file

2.3 编写脚本,备份ETL输出数据

log_cut_config.sh

#! /bin/bash

#       公共的配置
ACCESS_BASE_PATH=/data/xinniu/coolniu/access
# 输入日志的备份目录,只备份最近3天的日志
ACCESS_OUTPUT_SHOP_PATH=${ACCESS_BASE_PATH}/output/look_goods_detail_info
ACCESS_OUTPUT_SERVICE_PATH=${ACCESS_BASE_PATH}/output/show_goods_brand_info
FLUME_INPUT_SHOP_PATH=${ACCESS_BASE_PATH}/work/look_goods_detail_info
FLUME_INPUT_SERVICE_PATH=${ACCESS_BASE_PATH}/work/show_goods_brand_info
ACCESS_OUTPUT_BAK_SHOP_PATH=${ACCESS_BASE_PATH}/bak/look_goods_detail_info
ACCESS_OUTPUT_BAK_SERVICE_PATH=${ACCESS_BASE_PATH}/bak/show_goods_brand_info

log_mv.sh

#! /bin/bash
# 将日志切割

echo "-----------start: "`date +%Y%m%d%H%M%S`" -----------"
echo 'step1:加载配置文件log_cut_config'
#确定当前脚本的位置
cd `dirname $0`
shell_base_path=`pwd`

#加载log_cut_config 文件
. ${shell_base_path}/log_cut_config.sh

echo 'step2:校验log_cut_config 文件的param 是否有空的'
#校验log_cut_config 文件的param 是否有空的,如果有,就终止脚本
#1:无效;0:有效
params_invalid=0

if [ "${ACCESS_OUTPUT_SHOP_PATH}x" == "x" ]; then
        params_invalid=1
fi

if [ "${ACCESS_OUTPUT_SERVICE_PATH}x" == "x" ]; then
        params_invalid=1
fi
if [ "${FLUME_INPUT_SHOP_PATH}x" == "x" ]; then
        params_invalid=1
fi

if [ "${FLUME_INPUT_SERVICE_PATH}x" == "x" ]; then
        params_invalid=1
fi
if [ "${ACCESS_OUTPUT_BAK_SHOP_PATH}x" == "x" ]; then
        params_invalid=1
fi

if [ "${ACCESS_OUTPUT_BAK_SERVICE_PATH}x" == "x" ]; then
        params_invalid=1
fi

#如果有参数没配置,就停止脚本
if [ ${params_invalid} -eq 1 ]; then
        echo "log_cut_config shell config params error"
        exit
fi
mkdir -p ${ACCESS_OUTPUT_SHOP_PATH}
mkdir -p ${ACCESS_OUTPUT_SERVICE_PATH}
mkdir -p ${FLUME_INPUT_SHOP_PATH}
mkdir -p ${FLUME_INPUT_SERVICE_PATH}
mkdir -p ${ACCESS_OUTPUT_BAK_SHOP_PATH}
mkdir -p ${ACCESS_OUTPUT_BAK_SERVICE_PATH}

echo 'step4:copy 结构化数据到flume监控的目录(.tmp文件)+ 修改777'
cp ${ACCESS_OUTPUT_SHOP_PATH}/look_goods_detail_info.csv ${FLUME_INPUT_SHOP_PATH}/look_goods_detail_info.csv.tmp
cp ${ACCESS_OUTPUT_SERVICE_PATH}/show_goods_brand_info.csv ${FLUME_INPUT_SERVICE_PATH}/show_goods_brand_info.csv.tmp

echo 'step5:重命名flume监控的目录的(.tmp文件)'
mv ${FLUME_INPUT_SHOP_PATH}/look_goods_detail_info.csv.tmp  ${FLUME_INPUT_SHOP_PATH}/look_goods_detail_info.csv
mv ${FLUME_INPUT_SERVICE_PATH}/show_goods_brand_info.csv.tmp ${FLUME_INPUT_SERVICE_PATH}/show_goods_brand_info.csv

echo 'step6:mv 结构化数据到bak 目录下'
TIMESTAMP=`date +%Y%m%d%H%M%S`
mv ${ACCESS_OUTPUT_SHOP_PATH}/look_goods_detail_info.csv ${ACCESS_OUTPUT_BAK_SHOP_PATH}/${TIMESTAMP}_look_goods_detail_info.csv
mv ${ACCESS_OUTPUT_SERVICE_PATH}/show_goods_brand_info.csv ${ACCESS_OUTPUT_BAK_SERVICE_PATH}/${TIMESTAMP}_show_goods_brand_info.csv
echo 'step7:删除2天前的备份数据'
#删除2天前的备份文件,  21号的删19号的
delet_date=`date -d 2' day ago' +%Y%m%d`
rm -rf ${ACCESS_OUTPUT_BAK_SHOP_PATH}/${delet_date}*.csv
rm -rf ${ACCESS_OUTPUT_BAK_SERVICE_PATH}/${delet_date}*.csv
echo "-----------end: "`date +%Y%m%d%H%M%S`" -----------"

2.4 编写flume agent将ETL的数据上传到hdfs

运行agent之前需要将编译好的flume-ng-core包上传到lib目录下

运行agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f /data/xinniu/coolniu/access/flume/access_shop_log.agent -Dflume.root.logger=INFO,console

access_shop_log.agent

# Name the components on this agent
a1.sources = r1 r2 
a1.sinks = k1 k2 
a1.channels = c1 c2 

#-------r1--c1--k1-----login_table---------
# r1 sources类型
a1.sources.r1.type  =  spooldir
# 指定监控的目录
a1.sources.r1.spoolDir = /data/xinniu/coolniu/access/work/look_goods_detail_info/
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.deletePolicy = immediate
# 跳过.tmp结尾的文件
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$    
# hdfs sink-k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/data/xinniu/coolniu/etlout/%Y%m%d/LOOK_GOODS_DETAIL_INFO
a1.sinks.k1.hdfs.filePrefix = access_LOOK_GOODS_DETAIL_INFO
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
# 当达到50M,文件关闭,创建新文件写入
a1.sinks.k1.hdfs.rollSize = 52428800
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.callTimeout = 0
a1.sinks.k1.hdfs.idleTimeout = 60

a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000000
a1.channels.c1.transactionCapacity = 1000000

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#-------r2--c2--k2-----look_goods_detail_info---------
# r2 sources类型
a1.sources.r2.type  =  spooldir
# 指定监控的目录
a1.sources.r2.spoolDir = /data/xinniu/coolniu/access/work/show_goods_brand_info/
a1.sources.r2.fileHeader = true
a1.sources.r2.interceptors = i1
a1.sources.r2.interceptors.i1.type = timestamp
#如果想传完删掉,可以配置
#如果打开这个配置,cp mv 同样的文件到监控的目录下,也可以归集
#如果关闭这个配置,cp mv 同样的文件到监控的目录下,抛异常
a1.sources.r2.deletePolicy = immediate
# 跳过.tmp结尾的文件
a1.sources.r2.ignorePattern = ^(.)*\\.tmp$    
# hdfs sink-k2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://ns1/data/xinniu/coolniu/etlout/%Y%m%d/SHOW_GOODS_BRAND_INFO
a1.sinks.k2.hdfs.filePrefix = access_SHOW_GOODS_BRAND_INFO
a1.sinks.k2.hdfs.fileSuffix = .log
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.rollInterval = 0
# 当达到50M,文件关闭,创建新文件写入
a1.sinks.k2.hdfs.rollSize = 52428800
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.callTimeout = 0
a1.sinks.k2.hdfs.idleTimeout = 60

a1.channels.c2.type = memory
a1.channels.c2.capacity = 100000000
a1.channels.c2.transactionCapacity = 1000000

a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2

3 hive数仓搭建

3.0 安装sqoop

1 将sqoop安装包解压到/usr/local目录下

tar -zxvf /public/software/bigdata/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /usr/local/

2 创建软链接

ln -s /usr/local/sqoop-1.4.7.bin__hadoop-2.6.0/ /usr/local/sqoop
#安装hadoop环境
scp -r  coolniushop:/opt/hadoop-2.7.3 /opt/

export SQOOP_HOME=/usr/local/sqoop
export PATH=$SQOOP_HOME/bin:$PATH
export HADOOP_HOME=/opt/hadoop-2.7.3
export HADOOP_CONF_HOME=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

file

3.1 T层脚本

创建目录;/data/xinniu/coolniu/shell/

mkdir -p /data/xinniu/coolniu/access/log/step1-etl

并在hive中创建所需库

xinniu_tmp

xinniu_itl

xinniu_iol

xinniu_iml

xinniu_icl

access_look_goods_detail_info.sh脚本如下:


batch_date=$1
now_date=`date -d "${batch_date} 1day" +%Y%m%d`
# look_goods_detail_info
hive -e" 

drop table if exists xinniu_tmp.look_goods_detail_info;

create external table if not exists xinniu_tmp.look_goods_detail_info(
ip string,
looktime string,
goodsid string,
refer string,
browse string,
userid string,
username string
)
row format delimited fields terminated by '\t'
location '/data/xinniu/coolniu/etlout/${now_date}/LOOK_GOODS_DETAIL_INFO/'
;
"

res=$?
if [ ${res} != 0 ];then
echo 'load look_goods_detail_info data error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
exit 1
else
echo 'load look_goods_detail_info data successful! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
fi

# look_goods_detail_info建表语句
hive -e" create table if not exists xinniu_itl.look_goods_detail_info (
ip string,
looktime string,
goodsid string,
refer string,
browse string,
userid string,
username string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
"
res=$?
if [ ${res} != 0 ];then
echo 'load look_goods_detail_info ddl error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
exit 1
else
echo 'load look_goods_detail_info ddl successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
fi

hive -e "
 set hive.exec.dynamic.partition.mode=nonstrict ;
insert into xinniu_itl.look_goods_detail_info partition (pt) select t.* , '${batch_date}' pt from xinniu_tmp.look_goods_detail_info t;
"
res=$?
if [ ${res} != 0 ];then
echo 'load log2xinniu_itl error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
exit 1
else
echo 'load log2xinniu_itl successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_look_goods_detail_info.log
fi

access_show_goods_brand_info.sh脚本如下:

batch_date=$1
now_date=`date -d "${batch_date} 1day" +%Y%m%d`

# look_goods_detail_info
hive -e" 

drop table if exists xinniu_tmp.show_goods_brand_info;

create external table if not exists xinniu_tmp.show_goods_brand_info(
json string
)
row format delimited fields terminated by '\t'
location '/data/xinniu/coolniu/etlout/${now_date}/SHOW_GOODS_BRAND_INFO/'
;
"

res=$?
if [ ${res} != 0 ];then
echo 'load show_goods_brand_info data error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
exit 1
else
echo 'load show_goods_brand_info data successful! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
fi

hive -e" create table if not exists xinniu_itl.show_goods_brand_info (
goodsTagNames     string,
goodsSn           string,
goodsTypeId       string,
brandId           string,
saleState         string,
isGift            string,
isPromote         string,
isCod             string,
isReturns         string,
isMerger          string,
extendAttr        string,
keyValue          string,
goodsId           string,
goodsCname        string,
goodsEname        string,
goodsImg          string,
goodsPrice        string,
goodsColor        string,
goodsSize         string,
locality          string,
brandSizeId       string,
stockNumber       string,
warmNumber        string,
collectNumber     string,
brandCname        string,
brandEname        string,
giftId            string,
giftNumberMax     string,
giftPrice         string,
giftNumberRemain  string,
endDate           string,
remainHour        string,
typeCname         string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
"
res=$?
if [ ${res} != 0 ];then
echo 'load show_goods_brand_info ddl error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
exit 1
else
echo 'load show_goods_brand_info ddl successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
fi

hive -e "
alter table xinniu_itl.show_goods_brand_info drop if exists partition (pt = '${batch_date}');
insert into xinniu_itl.show_goods_brand_info partition (pt='${batch_date}') 
select json_tuple(json,'goodsTagNames','goodsSn','goodsTypeId','brandId','saleState','isGift',
'isPromote','isCod','isReturns','isMerger','extendAttr','keyValue','goodsId','goodsCname',
'goodsEname','goodsImg','goodsPrice','goodsColor','goodsSize','locality','brandSizeId','stockNumber','warmNumber',
'collectNumber','brandCname','brandEname','giftId','giftNumberMax','giftPrice','giftNumberRemain','endDate',
'remainHour','typeCname')  as (goodsTagNames,goodsSn,goodsTypeId,brandId,saleState,isGift,
isPromote,isCod,isReturns,isMerger,extendAttr,keyValue,goodsId,goodsCname,goodsEname,goodsImg,
goodsPrice,goodsColor,goodsSize,locality,brandSizeId,stockNumber,warmNumber,collectNumber,brandCname,
brandEname,giftId,giftNumberMax,giftPrice,giftNumberRemain,endDate,remainHour,typeCname) from 
(select explode(split(regexp_replace(regexp_replace(json, '\\[|\\]',''),'\\}\\,\\{','\\}\\|\\{'),'\\|')) as json from xinniu_tmp.show_goods_brand_info) t;
"

res=$?
if [ ${res} != 0 ];then
echo 'insert  show_goods_brand_info ddl error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
exit 1
else
echo 'insert  show_goods_brand_info ddl successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/access_show_goods_brand_info.log
fi

shop_goods.sh 脚本如下:


batch_date=$1

kinit -kt /data/xinniu.keytab xinniu
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://coolniushop:3306/hainiu_shop \
--username root \
--password hainiu \
--target-dir /data/xinniu/coolniu/access/sqoop/shop_goods/${batch_date}/ \
--num-mappers 1 \
--delete-target-dir \
--hive-drop-import-delims \
--fields-terminated-by "\t" \
--split-by Id \
--query 'select shop_goods.*, current_date etldate from shop_goods where $CONDITIONS'

res=$?
if [ ${res} != 0 ];then
echo 'extract shop_goods error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
exit 1
else
echo 'extract shop_goods successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
fi

hive -e "drop table if exists xinniu_tmp.shop_goods;create external table xinniu_tmp.shop_goods (
Id string,
goods_sn string,
goods_cname string,
goods_ename string,
locality string,
goods_price string,
goods_color string,
goods_size string,
brand_size_id string,
size_details string,
stock_number string,
warm_number string,
sold_number string,
collect_number string,
goods_weight string,
goods_volume string,
tags string,
goods_attr string,
gdetails_img string,
goods_cndetails string,
goods_endetails string,
key_value string,
sale_state string,
goods_img string,
is_gift string,
is_promote string,
is_cod string,
is_returns string,
is_merger string,
extend_attr string,
last_update_time string,
add_time string,
admin_uid string,
is_delete
 string,
etldate string
)
row format delimited fields terminated by '\t'
location '/data/xinniu/coolniu/access/sqoop/shop_goods/${batch_date}';
"

res=$?
if [ ${res} != 0 ];then
echo 'create shop_goods tmp table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
exit 1
else
echo 'create shop_goods tmp table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
fi

hive -e "create table if not exists xinniu_itl.shop_goods (
Id string,
goods_sn string,
goods_cname string,
goods_ename string,
locality string,
goods_price string,
goods_color string,
goods_size string,
brand_size_id string,
size_details string,
stock_number string,
warm_number string,
sold_number string,
collect_number string,
goods_weight string,
goods_volume string,
tags string,
goods_attr string,
gdetails_img string,
goods_cndetails string,
goods_endetails string,
key_value string,
sale_state string,
goods_img string,
is_gift string,
is_promote string,
is_cod string,
is_returns string,
is_merger string,
extend_attr string,
last_update_time string,
add_time string,
admin_uid string,
is_delete
 string,
etldate string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;"

res=$?
if [ ${res} != 0 ];then
echo 'create shop_goods itl table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
exit 1
else
echo 'create shop_goods itl table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
fi

hive -e "alter table xinniu_itl.shop_goods drop if exists partition (pt = '${batch_date}');"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict ;insert into table xinniu_itl.shop_goods partition (pt) select a.*,'${batch_date}' from xinniu_tmp.shop_goods a;"

res=$?
if [ ${res} != 0 ];then
echo 'load shop_goods data to itl table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
exit 1
else
echo 'load shop_goods data to itl table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods.log
fi

shop_goods_relate.sh 脚本如下:


batch_date=$1

sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://coolniushop:3306/hainiu_shop \
--username root \
--password hainiu \
--target-dir /data/xinniu/coolniu/access/sqoop/shop_goods_relate/${batch_date}/ \
--delete-target-dir \
--num-mappers 1 \
--hive-drop-import-delims \
--fields-terminated-by "\t" \
--split-by goods_id \
--query 'select shop_goods_relate.*, current_date etldate from shop_goods_relate where $CONDITIONS'

res=$?
if [ ${res} != 0 ];then
echo 'extract shop_goods_relate error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
exit 1
else
echo 'extract shop_goods_relate successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
fi

kinit -kt /data/impala.keytab impala
hive -e "drop table if exists xinniu_tmp.shop_goods_relate;create external table xinniu_tmp.shop_goods_relate (
goods_id string,
brand_id string,
goods_type_id string,
brand_catalog_id string,
relate_goods_ids string,
relate_news_ids
 string,
etldate string
)
row format delimited fields terminated by '\t'
location '/data/xinniu/coolniu/access/sqoop/shop_goods_relate/${batch_date}';
"

res=$?
if [ ${res} != 0 ];then
echo 'create shop_goods_relate tmp table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
exit 1
else
echo 'create shop_goods_relate tmp table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
fi

kinit -kt /data/impala.keytab impala
hive -e"create table if not exists xinniu_itl.shop_goods_relate (
goods_id string,
brand_id string,
goods_type_id string,
brand_catalog_id string,
relate_goods_ids string,
relate_news_ids
 string,
etldate string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;"

res=$?
if [ ${res} != 0 ];then
echo 'create shop_goods_relate itl table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
exit 1
else
echo 'create shop_goods_relate itl table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
fi

kinit -kt /data/impala.keytab impala
hive -e "alter table xinniu_itl.shop_goods_relate drop if exists partition (pt = '${batch_date}');"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict ;insert into table xinniu_itl.shop_goods_relate partition (pt) select a.*,'${batch_date}' from xinniu_tmp.shop_goods_relate a;"

res=$?
if [ ${res} != 0 ];then
echo 'load shop_goods_relate data to itl table error! '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
exit 1
else
echo 'load shop_goods_relate data to itl table successful '`date` >> /data/xinniu/coolniu/access/log/step1-etl/shop_goods_relate.log
fi

3.2 O层脚本

mkdir -p /data/xinniu/coolniu/access/log/step2-iol

look_goods_detail.sh脚本如下

#!/bin/bash
batch_date=$1
# 建表
hive -e"
create table if not exists xinniu_iol.look_goods_detail (
ip string,
looktime string,
goodsid string,
refer string,
browse string,
userid string,
username string,
goods_sn string,
goods_cname string,
goods_ename string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY');
"

res=$?
if [ ${res} != 0 ];then
echo 'create look_goods_detail table error! '`date` >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log
exit 1
else
echo 'create look_goods_detail table success '`date` >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log
fi

# 删除分区,查询导入数据
hive -e "
alter table xinniu_iol.look_goods_detail drop if exists partition (pt = '${batch_date}');
set hive.exec.dynamic.partition.mode=nonstrict ;
insert into table xinniu_iol.look_goods_detail partition(pt='${batch_date}')
select
t1.ip,
t1.looktime,
t1.goodsid,
t1.refer,
t1.browse,
t1.userid,
t1.username,
t2.goods_sn,
t2.goods_cname,
t2.goods_ename
from 
(select * from xinniu_itl.look_goods_detail_info where pt ='${batch_date}') t1 
left join 
(select Id, goods_sn, goods_cname, goods_ename from xinniu_itl.shop_goods where pt='${batch_date}') t2 
on t1.goodsid=t2.Id;
" 
res=$?
if [ ${res} != 0 ];then
echo 'look_goods_detail drop partition and insert error! '`date` >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log
exit 1
else
echo 'look_goods_detail drop partition and insert success '`date` >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log
fi

echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step2-iol/look_goods_detail.log

show_goods_brand.sh脚本如下:

#!/bin/bash
batch_date=$1
# 建表

hive -e" create table if not exists xinniu_iol.show_goods_brand_info (
goodsSn           string,
goodsTypeId       string,
brandId           string,
goodsId           string,
goodsCname        string,
goodsEname        string,
goodsPrice        string,
goodsColor        string,
goodsSize         string,
brandCname        string,
brandEname        string,
typeCname         string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY')
;
"

res=$?
if [ ${res} != 0 ];then
echo 'create xinniu_iol.show_goods_brand_info table error! '`date` >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log
exit 1
else
echo 'create xinniu_iol.show_goods_brand_info table success '`date` >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log
fi

# 删除分区,查询导入数据
hive -e "
alter table xinniu_iol.show_goods_brand_info drop if exists partition (pt = '${batch_date}');
set hive.exec.dynamic.partition.mode=nonstrict ;
insert into table xinniu_iol.show_goods_brand_info partition(pt='${batch_date}')
select goodsSn,goodsTypeId,brandId,goodsId,goodsCname,goodsEname,goodsPrice,goodsColor,goodsSize,brandCname,brandEname,typeCname
from xinniu_itl.show_goods_brand_info
" 
res=$?
if [ ${res} != 0 ];then
echo 'xinniu_iol.show_goods_brand_info add partition and insert error! '`date` >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log
exit 1
else
echo 'xinniu_iol.show_goods_brand_info add partition and insert success '`date` >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log
fi

echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step2-iol/xinniu_iol.show_goods_brand_info.log

3.3 M层脚本

mkdir -p /data/xinniu/coolniu/access/log/step3-iml

look_goods_detail_fact.sh脚本如下

#!/bin/bash
batch_date=$1

# 建表
hive -e"

create table if not exists xinniu_iml.look_goods_detail_fact (
ip string,
looktime string,
goodsid string,
refer string,
browse string,
userid string,
username string,
goods_sn string,
goods_cname string,
goods_ename string,
goods_type_id string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY');
"

res=$?
if [ ${res} != 0 ];then
echo 'create look_goods_detail_fact table error! '`date` >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log
exit 1
else
echo 'create look_goods_detail_fact table success '`date` >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log
fi

# 删除分区,查询导入数据

hive -e "

alter table xinniu_iml.look_goods_detail_fact drop if exists partition (pt = '${batch_date}');
set hive.exec.dynamic.partition.mode=nonstrict ;
insert into table xinniu_iml.look_goods_detail_fact partition(pt='${batch_date}')
select 
t1.ip,
t1.looktime,
t1.goodsid,
t1.refer,
t1.browse,
t1.userid,
t1.username,
t1.goods_sn,
t1.goods_cname,
t1.goods_ename,
t3.goods_type_id
from 
(select * from xinniu_iol.look_goods_detail where pt='${batch_date}') t1 
left join 
(select * from xinniu_itl.shop_goods_relate where pt='${batch_date}') t3 
on t1.goodsid=t3.goods_id;

" 
res=$?
if [ ${res} != 0 ];then
echo 'look_goods_detail_fact drop partition and insert error! '`date` >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log
exit 1
else
echo 'look_goods_detail_fact drop partition and insert success '`date` >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log
fi

echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step3-iml/look_goods_detail_fact.log

show_goods_brand_fact.sh脚本如下:

#!/bin/bash
batch_date=$1
mkdir -p  /data/xinniu/coolniu/access/log/step3-iml
# 建表
hive -e"

create table if not exists xinniu_iml.show_goods_brand_fact (
goodsSn           string,
goodsTypeId       string,
brandId           string,
goodsId           string,
goodsCname        string,
goodsEname        string,
goodsPrice        string,
goodsColor        string,
goodsSize         string,
brandCname        string,
brandEname        string,
typeCname         string
)
partitioned by (pt string)
stored as parquet
tblproperties ('parquet.compress'='SNAPPY');
"

res=$?
if [ ${res} != 0 ];then
echo 'create show_goods_brand_fact table error! '`date` >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log
exit 1
else
echo 'create show_goods_brand_fact table success '`date` >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log
fi

# 删除分区,查询导入数据

hive -e "

alter table xinniu_iml.show_goods_brand_fact drop if exists partition (pt = '${batch_date}');
set hive.exec.dynamic.partition.mode=nonstrict ;
insert into table xinniu_iml.show_goods_brand_fact partition(pt)
select * from xinniu_iol.show_goods_brand_info where pt='${batch_date}';
" 
res=$?
if [ ${res} != 0 ];then
echo 'show_goods_brand_fact drop partition and insert error! '`date` >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log
exit 1
else
echo 'show_goods_brand_fact drop partition and insert success '`date` >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log
fi

echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step3-iml/show_goods_brand_fact.log

3.4 C层脚本

mkdir -p /data/xinniu/coolniu/access/log/step5-idl
mkdir -p /data/xinniu/coolniu/export/

创建hainiu_dw数据库

create database hainiu_dw;

goods_click_rate.sh 脚本如下:

batch_date=$1

# mysql表名称
table_name=xinniu_shop_goods_clickrate

# 导出的文件路径名
export_file_path=/data/xinniu/coolniu/export/${table_name}_data_${batch_date}

hive -e "
select '${batch_date}' as batch_date,t2.goodsid as goodsid ,t2.goodscname as goodscname,nvl(t1.clicknum,0) as clicknum ,t2.shownum  as shownum  ,concat(round((nvl(t1.clicknum,0)/t2.shownum*100),2),'%') as rate from 
(select goodsid,goods_cname ,count(*) as clicknum from xinniu_iml.look_goods_detail_fact where pt='${batch_date}' group by  goodsid,goods_cname ) t1
 right join
(select goodsid,goodscname ,count(*) as shownum from xinniu_iml.show_goods_brand_fact where pt='${batch_date}'   group by goodsid,goodscname ) t2
on t1.goodsid=t1.goodsid and t1.goods_cname=t2.goodscname;
" 1> ${export_file_path} 2> /dev/null

res=$?
if [ ${res} != 0 ];then
echo 'export date: ${table_name}_data_${batch_date} error! '`date` >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log
exit 1
else
echo 'export date: ${table_name}_data_${batch_date} success '`date` >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log
fi

# 导入MySQL
mysqlDBUrl="/bin/mysql -hcoolniushop -p3306 -uroot -phainiu -Dhainiu_dw"
${mysqlDBUrl} <<EOF
CREATE TABLE if not exists ${table_name} (
  id int NOT NULL AUTO_INCREMENT,
  batch_date varchar(30),
  goodsid varchar(10),
  goodscname varchar(30) ,
  clicknum varchar(30) ,
  shownum varchar(256) ,
  clickrate varchar(30),
  PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

delete from ${table_name} where batch_date=${batch_date};

LOAD DATA LOCAL INFILE "${export_file_path}" INTO TABLE ${table_name} FIELDS 
TERMINATED BY '\t'
(batch_date,goodsid, goodscname, clicknum, shownum, clickrate);
EOF

res=$?
if [ ${res} != 0 ];then
echo 'import table:${table_name} error! '`date` >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log
exit 1
else
echo 'import table:${table_name} success '`date` >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log
fi

echo "----------------------------------------\n" >> /data/xinniu/coolniu/access/log/step5-idl/shop_goods_clicktop_top2.log

ago7date=`date -d "${batch_date} -7day" +%Y%m%d`
rm -f /data/xinniu/coolniu/export/${table_name}_data_${ago7date}

4.数据可视化

连接 mysql并创建hainiu_dw数据库

create database hainiu_dw;

4.1 mysql数据表如下:

file

4.2 配置zeppelin

访问zeppelin网址:http://11.147.251.157:9898/

file

4.2.1 编辑jdbc链接:

file

4.2.2 上传mysql驱动包到zeppelin lib目录下:

file

4.2.3 配置notebook

file

4.3 编写sql统计点击率:

file

select t.*,t.clicknum/t.shownum as rate from hainiu_dw.xinniu_shop_goods_clickrate t;
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-青牛,http://hainiubl.com/topics/76421
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter