kafka

发布时间:2024-12-12 07:36

最新推荐文章于 2023-04-24 16:02:01 发布

领悟大数据 于 2018-12-18 11:57:45 发布

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

package com.terry.kafkastream;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorSupplier;

import java.util.Properties;

/**

* 需求:对数据进行清洗操作

*

* 思路:terry-henshuai 把-清洗掉

*/

public class Application {

public static void a(String[] args) {

//1、定义主题 发送到另外一个主题 数据清洗

String oneTopic = "t1";

String twoTopic = "t2";

//2、设置属性

Properties properties = new Properties();

properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");

properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata:9092");

//3、实例对象

StreamsConfig streamsConfig = new StreamsConfig(properties);

//4、流计算

Topology topology = new Topology();

//5、定义kafka组件数据源

topology.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() {

@Override

public Processor<byte[], byte[]> get() {

return new LogProcessor();

}

},"Source").addSink("Sink",twoTopic,"Processor");

//6、实例化

KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);

kafkaStreams.start();

}

}

package com.terry.kafkastream;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorContext;

/**

* 数据清洗

*/

public class LogProcessor implements Processor<byte[], byte[]> {

private ProcessorContext processorContext;

@Override

public void init(ProcessorContext processorContext) {

//传输

this.processorContext=processorContext;

}

@Override

public void process(byte[] key, byte[] value) {

//1、拿到消息数据,专程字符串

String s = new String(value);

//2、如果包含-,则取出

if(s.contains("-")){

String[] split = s.split("-");

s = split[1];

}

processorContext.forward(key,s.getBytes());

}

@Override

public void close() {

}

}

网址:kafka https://www.yuejiaxmz.com/news/view/450600

相关内容

【HBZ分享】Kafka中日志清理策略
Kafka部分:kafka的原理,解释一下 leader 均衡机制(auto.leader.rebalance.enable=true),高可用和负载均衡的区别
kafka启动报错记录:Connection to node 0 could not be established. Broker may not be available.
WARN [Consumer clientId=connector
WARN [Producer clientId=console
分享个人收集的资源一些关于技术生活的资源 (干货满满)
点赞=转发?Twitter扩大试验规模,小白鼠用户情绪不稳定
【面试经验】字节生活服务二面凉经
2020年下半年最新:字节跳动“算法面试题汇总”让你面试没压力
约翰·威克/分布式新闻头条项目

随便看看