Skip to main content

WSO2 BAM cannot perform group by with long value from cassandra table

CREATE EXTERNAL TABLE IF NOT EXISTS APIRequestData (key STRING,api STRING,
api_version STRING,userId STRING,
context STRING,version STRING,trackingCode STRING,referer STRING, responseTime BIGINT,request INT, requestTime BIGINT, resource STRING, method STRING, hostName STRING, apiPublisher STRING) STORED BY
'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( 'wso2.carbon.datasource.name'='WSO2BAM_CASSANDRA_DATASOURCE',
"cassandra.cf.name" = "org_wso2_apimgt_statistics_request",
"cassandra.columns.mapping" = ":key,payload_api, payload_api_version,payload_userId,payload_context,payload_version, payload_trackingCode, payload_referer, payload_responseTime,payload_request, payload_requestTime, payload_resource, payload_method,payload_hostName,payload_apiPublisher" );

select api,api_version,apiPublisher,context, hostName,referer,responseTime from APIRequestData group by api,api_version,apiPublisher,context,hostName,referer,responseTime;

When running the above query, getting the below exception.
ava.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row
{"key":"1395228740704::10.100.0.52::9453::1","api":"united-airline","api_version":"united-airline:1.0.0","userid":null,"context":"/united-airline","version":null,"trackingcode":null,"referer":"http://localhost:8281/united-airline/1.0.0/","responsetime":1395228740455,"request":null,"requesttime":null,"resource":null,"method":null,"hostname":"rajeeva-ThinkPad-T520.local","apipublisher":"carbon.super"} at org.apache.hadoop.hive.ql.exec.ExecMapper.map(ExecMapper.java:161)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:435)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:211)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row
{"key":"1395228740704::10.100.0.52::9453::1","api":"united-airline","api_version":"united-airline:1.0.0","userid":null,"context":"/united-airline","version":null,"trackingcode":null,"referer":"http://localhost:8281/united-airline/1.0.0/","responsetime":1395228740455,"request":null,"requesttime":null,"resource":null,"method":null,"hostname":"rajeeva-ThinkPad-T520.local","apipublisher":"carbon.super"} at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:550)
at org.apache.hadoop.hive.ql.exec.ExecMapper.map(ExecMapper.java:143)
... 4 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.hive.serde2.lazy.CassandraLazyLong cannot be cast to org.apache.hadoop.hive.serde2.lazy.LazyLong
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:720)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:83)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:531)
... 5 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.hive.serde2.lazy.CassandraLazyLong cannot be cast to org.apache.hadoop.hive.serde2.lazy.LazyLong
at org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyLongObjectInspector.copyObject(LazyLongObjectInspector.java:43)
at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.copyToStandardObject(ObjectInspectorUtils.java:239)
at org.apache.hadoop.hive.ql.exec.KeyWrapperFactory$ListKeyWrapper.deepCopyElements(KeyWrapperFactory.java:150)
at org.apache.hadoop.hive.ql.exec.KeyWrapperFactory$ListKeyWrapper.deepCopyElements(KeyWrapperFactory.java:142)
at org.apache.hadoop.hive.ql.exec.KeyWrapperFactory$ListKeyWrapper.copyKey(KeyWrapperFactory.java:119)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processHashAggr(GroupByOperator.java:733)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:705)
... 14 more

https://wso2.org/jira/browse/BAM-1507

As mentioned in jira [2] this only happens when doing GROUP BY with long values. This can easily be resolved by casting long values to strings and use(this is how we always do group by then there is a long value involved). Following is a one such example.

CREATE EXTERNAL TABLE IF NOT EXISTS RequestHandlingStats (key STRING,payload_refTransactionId
STRING,payload_refpersonid STRING,payload_statusCode STRING,payload_status STRING,payload_timestamp BIGINT) STORED BY
'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES (
"wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
"cassandra.cf.name" = "MediationFlowStream",
"cassandra.columns.mapping" = ":key,payload_refTransactionId,payload_refPersonId,payload_statusCode,payload_status,payload_timestamp" );

CREATE EXTERNAL TABLE IF NOT EXISTS RequestHandlingStatsDB(Ref_Transaction_ID STRING, Ref_Person_ID STRING, Status_Code STRING,Status STRING, Time_Stamp STRING,Time STRING )
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES (
'wso2.carbon.datasource.name'='WSO2BAM_DATASOURCE_MYSQL',
'hive.jdbc.update.on.duplicate' = 'true',
'hive.jdbc.primary.key.fields' = 'Ref_Transaction_ID,Status_Code,Time_Stamp',
'hive.jdbc.table.create.query' = 'CREATE TABLE REQUEST_HANDLING_STATS ( Ref_Transaction_ID VARCHAR(150),Ref_Person_ID VARCHAR(100), Status_Code VARCHAR(100),Status VARCHAR(200), Time_Stamp VARCHAR(100), Time VARCHAR(30)) ' );


@Incremental(name="requestDataAnalyze", tables="RequestHandlingStats", bufferTime="100")

insert overwrite table RequestHandlingStatsDB select payload_refTransactionId,payload_refpersonid,payload_statusCode,payload_status,cast(payload_timestamp as STRING),from_unixtime(cast(payload_timestamp/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss') as time from RequestHandlingStats where payload_timestamp is not NULL group by payload_refTransactionId,payload_refpersonid,payload_statusCode,payload_status,cast(payload_timestamp AS STRING),from_unixtime(cast(payload_timestamp/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss');

Comments

Popular posts from this blog

How to generate random unique number in SOAP UI request

eg 1: ${=System.currentTimeMillis() + ((int)(Math.random()*10000))} eg 2: ${=java.util.UUID.randomUUID()} ${=java.util.UUID.randomUUID()} ${=System.currentTimeMillis() + ((int)(Math.random()*10000))} - See more at: http://tryitnw.blogspot.com/2014/03/generating-random-unique-number-in-soap.html#sthash.m2S4tUFu.dpuf ${=System.currentTimeMillis() + ((int)(Math.random()*10000))} - See more at: http://tryitnw.blogspot.com/2014/03/generating-random-unique-number-in-soap.html#sthash.m2S4tUFu.dpuf ${=System.currentTimeMillis() + ((int)(Math.random()*10000))} - See more at: http://tryitnw.blogspot.com/2014/03/generating-random-unique-number-in-soap.html#sthash.m2S4tUFu.dpuf

Tips on using environment variables in WSO2 Integration Cloud

Environment variables allow you to change an application's internal configuration without changing its source code. Let’s say you want to deploy the same application in development, testing  and production environments. Then database related configs and some other internal configurations may change from one environment to another. If we can define these configurations as an environment variables we can easily set those without changing the source code of that application. When you deploy your application in WSO2 Integration Cloud, it lets you define environment variables via the UI. Whenever you change the values of environment variables, you just need to redeploy the application for the changes to take effect. Predefined environment variables Key Concepts - Environment Variables   provides you some predefined set of environment variables which will be useful when deploying applications in WSO2 Integration Cloud. Sample on how to use environment variables ...

WSO2 ESB communication with WSO2 ESB Analytics

This blog post is about how & what ports involved when connecting from WSO2 ESB to WSO2 ESB Analytics. How to configure: This document explains how to configure it https://docs.wso2.com/display/ESB500/Prerequisites+to+Publish+Statistics Let's say we have WSO2 ESB  and WSO2 ESB Analytics packs we want to run in same physical machine, then we have to offset one instance.  But we don't want to do that since WSO2 ESB Analytics by default come with the offset. So WSO2ESB will run on 9443 port, WSO2 ESB Analytics will run on 9444 port WSO2 ESB publish data to the WSO2 ESB Analytics via thrift. By default thrift port is 7611 and corresponding ssl thrift port is 7711 (7611+100), check the data-bridge-config.xml file which is in analytics server config directory .  Since we are shipping analytics products with offset 1 then thrift ports are 7612 and ssl port is 7712. Here, ssl port (7712) is used for initial authentication purposes of data publisher ...