Sunday, February 7, 2016

WSO2 BAM cannot perform group by with long value from cassandra table

CREATE EXTERNAL TABLE IF NOT EXISTS APIRequestData (key STRING,api STRING,
api_version STRING,userId STRING,
context STRING,version STRING,trackingCode STRING,referer STRING, responseTime BIGINT,request INT, requestTime BIGINT, resource STRING, method STRING, hostName STRING, apiPublisher STRING) STORED BY
'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( 'wso2.carbon.datasource.name'='WSO2BAM_CASSANDRA_DATASOURCE',
"cassandra.cf.name" = "org_wso2_apimgt_statistics_request",
"cassandra.columns.mapping" = ":key,payload_api, payload_api_version,payload_userId,payload_context,payload_version, payload_trackingCode, payload_referer, payload_responseTime,payload_request, payload_requestTime, payload_resource, payload_method,payload_hostName,payload_apiPublisher" );

select api,api_version,apiPublisher,context, hostName,referer,responseTime from APIRequestData group by api,api_version,apiPublisher,context,hostName,referer,responseTime;

When running the above query, getting the below exception.
ava.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row
{"key":"1395228740704::10.100.0.52::9453::1","api":"united-airline","api_version":"united-airline:1.0.0","userid":null,"context":"/united-airline","version":null,"trackingcode":null,"referer":"http://localhost:8281/united-airline/1.0.0/","responsetime":1395228740455,"request":null,"requesttime":null,"resource":null,"method":null,"hostname":"rajeeva-ThinkPad-T520.local","apipublisher":"carbon.super"} at org.apache.hadoop.hive.ql.exec.ExecMapper.map(ExecMapper.java:161)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:435)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:211)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row
{"key":"1395228740704::10.100.0.52::9453::1","api":"united-airline","api_version":"united-airline:1.0.0","userid":null,"context":"/united-airline","version":null,"trackingcode":null,"referer":"http://localhost:8281/united-airline/1.0.0/","responsetime":1395228740455,"request":null,"requesttime":null,"resource":null,"method":null,"hostname":"rajeeva-ThinkPad-T520.local","apipublisher":"carbon.super"} at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:550)
at org.apache.hadoop.hive.ql.exec.ExecMapper.map(ExecMapper.java:143)
... 4 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.hive.serde2.lazy.CassandraLazyLong cannot be cast to org.apache.hadoop.hive.serde2.lazy.LazyLong
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:720)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:83)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:531)
... 5 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.hive.serde2.lazy.CassandraLazyLong cannot be cast to org.apache.hadoop.hive.serde2.lazy.LazyLong
at org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyLongObjectInspector.copyObject(LazyLongObjectInspector.java:43)
at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.copyToStandardObject(ObjectInspectorUtils.java:239)
at org.apache.hadoop.hive.ql.exec.KeyWrapperFactory$ListKeyWrapper.deepCopyElements(KeyWrapperFactory.java:150)
at org.apache.hadoop.hive.ql.exec.KeyWrapperFactory$ListKeyWrapper.deepCopyElements(KeyWrapperFactory.java:142)
at org.apache.hadoop.hive.ql.exec.KeyWrapperFactory$ListKeyWrapper.copyKey(KeyWrapperFactory.java:119)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processHashAggr(GroupByOperator.java:733)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:705)
... 14 more

https://wso2.org/jira/browse/BAM-1507

As mentioned in jira [2] this only happens when doing GROUP BY with long values. This can easily be resolved by casting long values to strings and use(this is how we always do group by then there is a long value involved). Following is a one such example.

CREATE EXTERNAL TABLE IF NOT EXISTS RequestHandlingStats (key STRING,payload_refTransactionId
STRING,payload_refpersonid STRING,payload_statusCode STRING,payload_status STRING,payload_timestamp BIGINT) STORED BY
'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES (
"wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
"cassandra.cf.name" = "MediationFlowStream",
"cassandra.columns.mapping" = ":key,payload_refTransactionId,payload_refPersonId,payload_statusCode,payload_status,payload_timestamp" );

CREATE EXTERNAL TABLE IF NOT EXISTS RequestHandlingStatsDB(Ref_Transaction_ID STRING, Ref_Person_ID STRING, Status_Code STRING,Status STRING, Time_Stamp STRING,Time STRING )
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES (
'wso2.carbon.datasource.name'='WSO2BAM_DATASOURCE_MYSQL',
'hive.jdbc.update.on.duplicate' = 'true',
'hive.jdbc.primary.key.fields' = 'Ref_Transaction_ID,Status_Code,Time_Stamp',
'hive.jdbc.table.create.query' = 'CREATE TABLE REQUEST_HANDLING_STATS ( Ref_Transaction_ID VARCHAR(150),Ref_Person_ID VARCHAR(100), Status_Code VARCHAR(100),Status VARCHAR(200), Time_Stamp VARCHAR(100), Time VARCHAR(30)) ' );


@Incremental(name="requestDataAnalyze", tables="RequestHandlingStats", bufferTime="100")

insert overwrite table RequestHandlingStatsDB select payload_refTransactionId,payload_refpersonid,payload_statusCode,payload_status,cast(payload_timestamp as STRING),from_unixtime(cast(payload_timestamp/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss') as time from RequestHandlingStats where payload_timestamp is not NULL group by payload_refTransactionId,payload_refpersonid,payload_statusCode,payload_status,cast(payload_timestamp AS STRING),from_unixtime(cast(payload_timestamp/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss');

No comments: