波斯马BOSSMA Information Technology

在.NET平台下使用C#通过Aquiles访问Cassandra

发布时间:2011年11月23日 / 分类:DataBase, NoSQL / 10,314 次浏览 / 评论

Aquiles是访问Cassandra的一个高级API,使用C#语言编写,底层当然还是基于Thrift的。这个高级API有很多好处,比如支持连接池,支持多个集群,支持Windows性能监视器,支持配置文件等等。这篇文章将介绍一些常用的操作。

看了大量的E文,结合自己的测试,大体搞明白了各个部分,然后迷迷糊糊的整了这么一篇文章。因此错漏之处再所难免,还望指正。

1、config配置

<configSections>
        <section name="aquilesConfiguration" type="Aquiles.Core.Configuration.AquilesConfigurationSection,Aquiles.Core"/>
    </configSections>

    <aquilesConfiguration>
        <monitorFeaturesType>Aquiles.Diagnostics.Extension.PerformanceCounterMonitor,
Aquiles.Diagnostics.Extension</monitorFeaturesType>
        <clusters>
            <add friendlyName="Cluster1">
                <connection poolType="SIZECONTROLLEDPOOL" factoryType="FRAMED">
                </connection>
                <endpointManager type="ROUNDROBIN" defaultTimeout="6000">
                    <cassandraEndpoints>
                        <add address="192.168.1.100" port="9160"/>
                    </cassandraEndpoints>
                </endpointManager>
            </add>
        </clusters>
    </aquilesConfiguration>

monitorFeaturesType用于添加到Windows性能监视器;

clusters定义了Casssndra集群;

friendlyName是某个集群的名称;

connection定义了连接池的类型和传输模式(Cassandra默认推荐FRAMED模式);

endpointManager定义了集群中的节点。

关于定义的详细说明,请看这篇文章:
http://aquiles.codeplex.com/wikipage?title=Aquiles.Configuration&referringTitle=Documentation

2、创建KeySpace和ColumnFamily

 // 定义KeySpace
            KsDef objKsDef = new KsDef();
            objKsDef.Name = "Ks1";
            objKsDef.Replication_factor = 1;
            objKsDef.Strategy_class = "org.apache.cassandra.locator.SimpleStrategy";
            objKsDef.Cf_defs = new List<CfDef>();

            // 定义Column Family
            CfDef objCfDef = new CfDef();
            objCfDef.Keyspace = objKsDef.Name;
            objCfDef.Comparator_type = "BytesType";
            objCfDef.Key_cache_size = 20000;
            objCfDef.Row_cache_save_period_in_seconds = 0;
            objCfDef.Key_cache_save_period_in_seconds = 3600;
            objCfDef.Name = "UserToken";

            // 获取Cluster
            string CLUSTERNAME = "Cluster1";
            ICluster cluster = AquilesHelper.RetrieveCluster(CLUSTERNAME);

            // 查询出全部KeySpace
            ExecutionBlock describe_keyspaces = new ExecutionBlock(delegate(Cassandra.Client client)
            {
                return client.describe_keyspaces();
            });
            List<KsDef> ksDefList = (List<KsDef>)cluster.Execute(describe_keyspaces);

            // 检查keyspace和columnFamily是否已经存在
            bool keyspaceExists = false;
            bool columnFamilyExists = false;
            if (ksDefList != null)
            {
                // 遍历keyspace列表,检查keyspace是否已经存在
                foreach (KsDef ksDef in ksDefList)
                {
                    if (ksDef.Name.CompareTo(objKsDef.Name) == 0)
                    {
                        keyspaceExists = true;

                        // 遍历columnFamily列表,检查columnFamily是否已经存在
                        List<CfDef> cfDefList = ksDef.Cf_defs;
                        if (cfDefList != null)
                        {
                            foreach (CfDef cfDef in cfDefList)
                            {
                                if (cfDef.Name.CompareTo(objCfDef.Name) == 0)
                                {
                                    columnFamilyExists = true;
                                    break;
                                }
                            }
                        }
                        break;
                    }
                }
            }

            try
            {
                // 如果keyspace不存在,执行添加
                if (!keyspaceExists)
                {
                    ExecutionBlock create_keyspace = new ExecutionBlock(delegate(Cassandra.Client client)
                    {
                        return client.system_add_keyspace(objKsDef);
                    });
                    cluster.Execute(create_keyspace);
                }

                // 如果columnFamily不存在,执行添加
                if (!columnFamilyExists)
                {
                    ExecutionBlock create_columnFamily = new ExecutionBlock(delegate(Cassandra.Client client)
                    {
                        return client.system_add_column_family(objCfDef);
                    });
                    cluster.Execute(create_columnFamily, objKsDef.Name);
                }
            }
            finally
            {
                //ExecutionBlock drop_keyspace = new ExecutionBlock(delegate(Cassandra.Client client)
                //{
                //    return client.system_drop_keyspace(objKsDef.Name);
                //});
                //cluster.Execute(drop_keyspace);
            }

代码还是比较繁琐的。

3、添加数据

byte[] key = ByteEncoderHelper.UTF8Encoder.ToByteArray("9999");

            ColumnParent columnParent = new ColumnParent()
            {
                Column_family = "UserToken",
            };

            Column column = new Column()
            {
                Name = ByteEncoderHelper.UTF8Encoder.ToByteArray("Token"),
                Value = ByteEncoderHelper.UTF8Encoder.ToByteArray("12345678901"),
                Timestamp = UnixHelper.UnixTimestamp
            };

            ICluster cluster = AquilesHelper.RetrieveCluster("Cluster1");
            cluster.Execute(new ExecutionBlock(delegate(Cassandra.Client client)
            {
                client.insert(key, columnParent, column, ConsistencyLevel.ONE);
                return null;
            }), "Ks1");

4、简单查询数据

查询指定Key,指定Column的数据。需要提供一个ColumnPath,指示查询的路径。

 byte[] key = ByteEncoderHelper.UTF8Encoder.ToByteArray("9999");

            ColumnPath columnPath = new ColumnPath()
            {
                Column = ByteEncoderHelper.UTF8Encoder.ToByteArray("Token"),
                Column_family = "UserToken",
            };

            ICluster cluster = AquilesHelper.RetrieveCluster("Cluster1");
ColumnOrSuperColumn column = (ColumnOrSuperColumn)cluster.Execute(new ExecutionBlock(delegate(Cassandra.Client client)
            {
                return client.get(key, columnPath, ConsistencyLevel.ONE);
            }), "Ks1");

注意查询数据如果不存在,会抛出异常,需要使用者hold住它。

5、移除数据

指定要移除的Key和Column或者ColumnFamily。

byte[] key = ByteEncoderHelper.UTF8Encoder.ToByteArray("9999");

            ColumnPath columnPath = new ColumnPath()
            {
                Column_family = "UserToken",
            };

            ICluster cluster = AquilesHelper.RetrieveCluster("Cluster1");
            cluster.Execute(new ExecutionBlock(delegate(Cassandra.Client client)
            {
                client.remove(key, columnPath, UnixHelper.UnixTimestamp, ConsistencyLevel.ONE);
                return null;
            }), "Ks2");

注意移除数据通过get_range_slice还可以查询出来相应的key,这一点很奇怪。

官方的说法是:

即使没有做移除操作,也完全可能存在有的Row仅有一个key,但其Column数据不存在。

而且在分布式的情况下,将Column数据不存在的Row剔除出来性能上不允许。

原文是:

Because get_range_slice says, “apply this predicate to the range of rows given,” meaning, if the predicate result is empty, we have to include an empty result for that row key. It is perfectly valid to perform such a query returning empty column lists for some or all keys, even if no deletions have been performed.

So to special case leaving out result entries for deletions, we would have to check the entire rest of the row to make sure there is no undeleted data anywhere else either (in which case leaving the key out would be an error).

This is what we used to do with the old get_key_range method, but the performance hit turned out to be unacceptable.

关于分布式删除还可以看这篇文章:

http://wiki.apache.org/cassandra/DistributedDeletes

本博客所有文章如无特别注明均为原创。
复制或转载请以超链接形式注明转自波斯马,原文地址《在.NET平台下使用C#通过Aquiles访问Cassandra

关键字:

建议订阅本站,及时阅读最新文章!
【上一篇】 【下一篇】

发表评论