Inicio / Blog / Integración de Storm y Cassandra
Imagen de Eros
Eros Perez M.
Comp. Science Engineer
23 Ene 2017
Integración
de Storm y Cassandra

Integrar Apache Storm y Cassandra proporcionando una implementación genérica y configurable backtype.storm.Bolt

Body: 

Integración de Storm y Cassandra

Integrar Storm y Cassandra proporcionando una implementación genérica y configurable backtype.storm.Bolt  que escribe Storm Tuple objects a Cassandra Column Family.

La forma en que se escriben los datos de las Tupla Storm a Cassandra es dinámicamente configurable: se proporcionan clases que "determinan" una familia de columnas, una clave de fila y un nombre / valores de columna y el bolt escribirá los datos en un clúster de Cassandra.

Localización del proyecto

El la ubicación del código de este a proyecto es la siguiente: https://github.com/hmsonline/storm-cassandra

Las dependencias del proyecto están disponibles en repositorio Maven central.

Construir desde código

        $ mvn install

El uso

Uso básico

CassandraBolt, TridentCassandraLookupFunction y TridentCassandraWriteFunction espera que un nombre de host, un puerto y un espacio de llaves de Cassandra se establezcan en la configuración de la topología Storm. Para permitir múltiples instancias de estos en una topología y no requieren que todos se conecten a la misma instancia Cassandra los valores se agregan a la configuración de Storm con una clave y un mapa. La clave para indicar qué mapa utilizar se establece en el constructor de estas clases al instanciarlas.

  1.  
  2. Map< string, object="" > cassandraConfig = new HashMap< string, object="" >();
  3. cassandraConfig.put(StormCassandraConstants.CASSANDRA_HOST, "localhost:9160");
  4. cassandraConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, "testKeyspace");
  5. Config config = new Config();
  6. config.put("CassandraLocal", cassandraConfig);

La clase CassandraBolt proporciona un constructor de conveniencia que toma como argumentos un nombre de familia de columna y un valor de campo de clave de fila:

  1. IRichBolt cassandraBolt = new CassandraBolt("columnFamily", "rowKey");

El constructor anterior creará un CassandraBolt que escribe en la familia de columnas "columnFamily" y buscará/usará un campo llamado "rowKey" en los objetos backtype.storm.tuple.Tuple que recibe como la clave de fila Cassandra.

Para cada campo en el backtype.storm.Tuple recibido, el CassandraBolt escribirá un par de nombre/valor de columna.

Por ejemplo, dado el constructor listado anteriormente, un valor de tupla de:

  1.       {rowKey: 12345, field1: "foo", field2: "bar}

Daría la siguiente fila de Cassandra (Como se ve de cassandra-cli):

  1.   
  2.    RowKey: 12345
  3.        => (column=field1, value=foo, timestamp=1321938505071001)
  4.        => (column=field2, value=bar, timestamp=1321938505072000)

Cassandra Write Function

Storm Trident filtra la Tupla original si una función no emite nada. Para permitir un procesamiento adicional después de escribir a Cassandra, el TridentCassandraWriteFunction puede emitir un valor de objeto estático. El propósito principal de esta emisión es simplemente permitir que la Tupla continúe en lugar de filtrarlo. El valor estático se puede establecer en el constructor o llamando al método setValueToEmitAfterWrite. Si se establece el valor por emisión en NULL, la función no emitirá nada y Storm filtrará la Tupla. El comportamiento predeterminado es no emitir. Si la función emite un valor, no olvide declarar el campo de salida al construir la topología.

Cassandra Counter Columns

El concepto de contador de columna es similar a la anterior, sin embargo, se debe especificar el rowKey y un valor para especificar la cantidad a incrementar. Serán asumidas todos los demás campos para especificar las columnas que se incrementará en dicha cantidad.

  1. CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt( "columnFamily", "RowKeyField", "IncrementAmountField" );

El constructor anterior creará un Bolt que escribe en la familia de columnas "columnFamily" y utilizará un campo llamado "RowKeyField" en las tuplas que recibe. Se supondrá que todos los campos restantes de la Tupla contienen los nombres de las columnas que se van a incrementar.

Dada la siguiente Tupla:

  1.        {rowKey: 12345, IncrementAmount: 1L, IncrementColumn: 'SomeCounter'}
 

Incrementaría la columna del contador "SomeCounter" en 1L.

Ejemplos

El directorio "Examples" contiene dos ejemplos:

  • CassandraReachTopology
  • PersistentWordCount

Cassandra Reach Topology

El ejemplo de CassandraReachTopology es un ejemplo de RPC distribuido por Storm que es esencialmente un clon de ReachTopology de Nathan Marz, que en lugar de usar almacenes de datos en memoria está respaldado por una base de datos Cassandra y usa bolts genéricos de storm-cassandra para consultar la base de datos.

Persistent Word Count

La topología PersistentWordCount muestra el uso básico de la implementación de Cassandra Bolt. Se reusa el spout TestWordSpout y el bolt TestWordCounter del tutorial de Storm y agrega una instancia de CassandraBolt para persistir los resultados.

Preparación

Para ejecutar los ejemplos, necesitará una base de datos Cassandra que se ejecute en localhost: 9160.

Construir el fuente de ejemplo

  1.  
  2.        $ cd examples
  3.   $ mvn install

Crear esquemas de Cassandra y datos de ejemplo

Instale y ejecute Apache Cassandra.

Cree el esquema de ejemplo utilizando cassandra-cli:

  1.  
  2. $ cd schema
  3.      $ cat cassandra_schema.txt | cassandra-cli -h localhost

Ejecución de la Cassandra Reach Topology

Para ejecutar CassandraReachTopology, ejecute el comando maven siguiente:

  1.  
  2. $ mvn exec:java -Dexec.mainClass=com.hmsonline.storm.cassandra.example.CassandraReachTopology

Entre la salida, debe ver lo siguiente:

  1.  
  2. Reach of http://github.com/hmsonline: 3
  3. Reach of http://github.com/nathanmarz: 3
  4. Reach of http://github.com/ptgoetz: 4
  5. Reach of http://github.com/boneill: 0

Para habilitar el registro de todas las tuplas enviadas dentro de la topología, ejecute el siguiente comando:

  1. $ mvn exec:java -Dexec.mainClass=com.hmsonline.storm.cassandra.example.CassandraReachTopology -Ddebug=true

Ejecución del Ejemplo Persistent Word Count

El ejemplo PersistentWordCount  construye la siguiente topología:

  1. TestWordSpout ==> TestWordCounter ==> CassandraBolt

Data Flow

  1. TestWordSpout emite palabras aleatoriamente desde una lista predefinida.
  2. TestWordCounter recibe una palabra, actualize el contador para esa palabra, y emite una tupla que contiene la palabra y el correspondiente contador ("word", "count").
  3. El CassandraBolt recibe la tupla ("word", "count") y la escribe en la base datos Cassandra usando la palabra como clave de fila  o row key.

Ejecute la topología PersistentWordCount:

  1.      $ mvn exec:java -Dexec.mainClass=com.hmsonline.storm.cassandra.example.PersistentWordCount

Ver el resultado final en cassandra-cli:

  1. $ cassandra-cli -h localhost
  2. [default@unknown] use stormks;
  3.         [default@stromks] list stormcf;

La salida debe parecerse a lo siguiente:

        Using default limit of 100

        -------------------

        RowKey: nathan

        => (column=count, value=22, timestamp=1322332601951001)

        => (column=word, value=nathan, timestamp=1322332601951000)

        -------------------

        RowKey: mike

        => (column=count, value=11, timestamp=1322332600330001)

        => (column=word, value=mike, timestamp=1322332600330000)

        -------------------

        RowKey: jackson

        => (column=count, value=17, timestamp=1322332600633001)

        => (column=word, value=jackson, timestamp=1322332600633000)

        -------------------

        RowKey: golda

        => (column=count, value=31, timestamp=1322332602155001)

        => (column=word, value=golda, timestamp=1322332602155000)

        -------------------

        RowKey: bertels

        => (column=count, value=16, timestamp=1322332602257000)

        => (column=word, value=bertels, timestamp=1322332602255000)

       

        5 Rows Returned.

        Elapsed time: 8 msec(s).