如何用Java向kafka json格式消息发送json数据

怎么使用java连接kafka_百度知道
怎么使用java连接kafka
我有更好的答案
把你要传递的数据转换成json字符串返回接口,然后手机端调用接口就可以获取到你要传递是值了
【免费测试,验证码5秒必达】
主营:验证码短信、系统通知短信、营销推广短信、手机流量。
为您推荐:
其他类似问题
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。)写了收集sparkstreaming的日志进入kafka便于后续收集到es中快速统计分析,今天就再写一篇如何在普通应用程序实时收集日志,上一篇写的毕竟是分布式环境下的操作,有一定的特殊性,如MapReduce,Spark运行的日志和普通项目的日志是不太一样的。
所谓的普通程序就是web项目的或者非web项目的的程序,大部分都是单机版本的。
大多数时候,我们的log都会输出到本地的磁盘上,排查问题也是使用linux命令来搞定,如果web程序组成负载集群,那么就有多台机器,如果有几十台机器,几十个服务,那么想快速定位log问题和排查就比较麻烦了,所以很有必要有一个统一的平台管理log,现在大多数公司的套路都是收集重要应用的log集中到kafka中,然后在分别导入到es和hdfs上,一个做实时检索分析,另一个做离线统计和数据备份。
如何能快速收集应用日志到kafka中?
kafka官网已经提供了非常方便的log4j的集成包
kafka-log4j-appender,我们只需要简单配置log4j文件,就能收集应用程序log到kafka中。
#log4j.rootLogger=WARN,console,kafka
log4j.rootLogger=INFO,console
# for package com.demo.kafka, log would be sent to kafka appender.
#log4j.logger.com.bigdata.xuele.streaming.SparkStreamingKmd*=info,kafka
# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=${kafka.log.topic}
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=${kafka.log.brokers}
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
#log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
log4j.appender.kafka.layout.ConversionPattern=[%d] %p %m (%c)%n
# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
注意,需要引入maven的依赖包:
&dependency&
&groupId&org.apache.kafka&/groupId&
&artifactId&kafka-clients&/artifactId&
&version&0.8.2.1&/version&
&/dependency&
非常简单,一个maven依赖加一个log4j配置文件即可,如果依然想写入log到本地
文件依然也是可以的,这种方式最简单快速,但是默认的的log日志是一行一行的纯文本,有些场景下我们可能需要json格式的数据。
重写Log4jAppender,自定义输出格式,支持json格式,如果是json格式的数据打入到kafka中,后续收集程序可能就非常方便了,直接拿到json就能入到mongodb或者es中,如果打入到kafka中的数据是纯文本,那么收集程序,可能需要做一些etl,解析其中的一些字段然后再入到es中,所以原生的输出格式,可能稍不灵活,这样就需要我们自己写一些类,然后达到灵活的程度,github连接:
感兴趣的朋友可以看下。
(1)方法一简单快速,不支持json格式的输出,打到kafka的消息都是原样的log日志信息
(2)方法二稍微复杂,需要自己扩展log收集类,但支持json格式的数据输出,对于想落地json数据直接到存储系统中是非常适合的。
此外需要注意,在调试的时候log发送数据到kafka模式最好是同步模式的否则你控制台打印的数据很有可能不会被收集kafka中,程序就停止了。生产环境最好开启异步发送数据模式,因为内部是批量的处理,所以能提升吞吐,但有一定的轻微延迟。
官网log4j-appender的源码:
阅读(...) 评论()使用HttpClient通过post方式发送json数据 – 过往记忆
欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop。
文章总数:901
浏览总数:12,000,090
评论:3664
分类目录:95 个 注册用户数:2804 最后更新:日
欢迎关注微信公共帐号:iteblog_hadoop大数据猿:bigdata_ai
  有时候我们在发送HTTP请求的时候会使用到POST方式,如果是传送普通的表单数据那将很方便,直接将参数到一个Key-value形式的Map中即可。但是如果我们需要传送的参数是Json格式的,会稍微有点麻烦,我们可以使用HttpClient类库提供的功能来实现这个需求。假设我们需要发送的数据是:
&blog&: &https://www.iteblog.com&,
&Author&: &iteblog&
我们可以通过JSONObject够着Json:
JSONObject jsonObject = new JSONObject();
jsonObject.put(&blog&, &https://www.iteblog.com&);
jsonObject.put(&Author&, &iteblog&);
如果需要使用Post方式来发送这个数据,我们可以如下实现:
private HttpMethodBase createMethod(String url, int timeout) {
PostMethod method =
method = new PostMethod(url);
JSONObject jsonObject = new JSONObject();
jsonObject.put(&blog&, &https://www.iteblog.com&);
jsonObject.put(&Author&, &iteblog&);
String transJson = jsonObject.toString();
RequestEntity se = new StringRequestEntity(transJson, &application/json&, &UTF-8&);
method.setRequestEntity(se);
//使用系统提供的默认的恢复策略
method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler());
//设置超时的时间
method.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, timeout);
} catch (IllegalArgumentException e) {
logger.error(&非法的URL:{}&, url);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
  我们通过StringRequestEntity来构造请求实体,在这里,StringRequestEntity将接收三个参数,如下:
public StringRequestEntity(String content, String contentType, String charset)
  throws UnsupportedEncodingException
  其中参数content就是我们需要传输的数据;contentType是传送数据的格式,因为我们的数据格式是json的,所以contentType必须填写application/json(更多的contentType可以参见);charset是字符集编码。  然后我们再通过HttpClient对象的executeMethod方法来执行:
int statusCode = httpClient.executeMethod(getMethod);
//只要在获取源码中,服务器返回的不是200代码,则统一认为抓取源码失败,返回null。
if (statusCode != HttpStatus.SC_OK) {
logger.error(&Method failed: & + getMethod.getStatusLine() + &\tstatusCode: & + statusCode);
pom.xml文件的关键内容
&dependencies&
&!--网络爬虫--&
&dependency&
&groupId&commons-httpclient&/groupId&
&artifactId&commons-httpclient&/artifactId&
&version&3.1&/version&
&/dependency&
&dependency&
&groupId&org.apache.httpcomponents&/groupId&
&artifactId&httpcore&/artifactId&
&version&4.3.1&/version&
&/dependency&
&dependency&
&groupId&com.google.guava&/groupId&
&artifactId&guava&/artifactId&
&version&14.0.1&/version&
&/dependency&
&dependency&
&groupId&org.json&/groupId&
&artifactId&json&/artifactId&
&version&&/version&
&/dependency&
&/dependencies&
完整代码下载本博客文章除特别声明,全部都是原创!禁止个人和公司转载本文、谢谢理解:本文链接:
下面文章您可能感兴趣Flume使用json方式接收数据
&最近公司要用flume来抓取手机端app和html5产生的数据,进行分析,考虑到数据使用起来方便些,决定用json方式来接收,于是决定先写个例子测试下;
&我们是这样配置的首先在flume/conf下建立了jsonlog.conf在里面编写配置信息:
jsonlog.sources = jsonsc
jsonlog.sinks = jsonsink
jsonlog.channels = jsoncn
# Describe/configure the source
jsonlog.sources.jsonsc.type =
org.apache.flume.source.http.HTTPSource
jsonlog.sources.jsonsc.bind=**.**.**.**
jsonlog.sources.jsonsc.port = 5140
jsonlog.sources.jsonsc.channels = jsoncn
# Describe the sink
jsonlog.sinks.jsonsink.type = logger
# Use a channel which buffers events in memory
jsonlog.channels.jsoncn.type = memory
jsonlog.channels.jsoncn.capacity = 1000
jsonlog.channels.jsoncn.transactionCapacity = 100
# Bind the source and sink to the channel
jsonlog.sources.jsonsc.channels = jsoncn
jsonlog.sinks.jsonsink.channel = jsoncn
在命令行输入./flume-ng agent -c . -f ../conf/jsonlog.conf -n
jsonlog& 启动flume agent
编写java代码通过httpconnection方式连接发送json格式数据测试运行,客户端报400错误,flume服务端报错如下
WARN http.HTTPSource: Received bad request from client.
org.apache.flume.source.http.HTTPBadRequestException: Request has
invalid JSON Syntax.
org.apache.flume.source.http.JSONHandler.getEvents(JSONHandler.java:119)
org.apache.flume.source.http.HTTPSource$FlumeHTTPServlet.doPost(HTTPSource.java:184)
javax.servlet.http.HttpServlet.service(HttpServlet.java:725)
javax.servlet.http.HttpServlet.service(HttpServlet.java:814)
org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:401)
org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
org.mortbay.jetty.Server.handle(Server.java:326)
org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
org.mortbay.jetty.HttpConnection$RequestHandler.content(HttpConnection.java:945)
org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:756)
org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:218)
org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
org.mortbay.jetty.bio.SocketConnector$Connection.run(SocketConnector.java:228)
org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
Caused by: com.google.gson.JsonSyntaxException:
java.lang.IllegalStateException: Expected BEGIN_ARRAY but was
BEGIN_OBJECT at line 1 column 2
com.google.gson.Gson.fromJson(Gson.java:806)
com.google.gson.Gson.fromJson(Gson.java:761)
org.apache.flume.source.http.JSONHandler.getEvents(JSONHandler.java:117)
&... 16 more
Caused by: java.lang.IllegalStateException: Expected BEGIN_ARRAY
but was BEGIN_OBJECT at line 1 column 2
com.google.gson.stream.JsonReader.expect(JsonReader.java:339)
com.google.gson.stream.JsonReader.beginArray(JsonReader.java:306)
com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:79)
com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
com.google.gson.Gson.fromJson(Gson.java:795)
&... 18 more
后来查看源代码在org.apache.flume.source.http.HTTPSource类中看到有这样一段说明,告诉我们使用json必须根据他的封装格式进行封装,否则会报400,503错误
然后修改java客户端代码如下
import java.io.BufferedR
import java.io.DataOutputS
import java.io.IOE
import java.io.InputStreamR
import java.io.UnsupportedEncodingExcep
import java.net.HttpURLC
import java.net.MalformedURLE
import java.net.URL;
import java.util.ArrayL
import java.util.HashM
import java.util.L
import java.util.M
import org.apache.flume.event.JSONE
import com.google.gson.G
public class FlumeJsonTest{
&public static void main(String[] args) {
&& //创建连接
&& URL url = new URL("");
&& HttpURLConnection connection =
(HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setRequestMethod("POST");
connection.setUseCaches(false);
connection.setInstanceFollowRedirects(true);
connection.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
&& connection.connect();
&& //POST请求
&& DataOutputStream out = new
DataOutputStream(
connection.getOutputStream());
&& JSONEvent jse = new
JSONEvent();
&& Map ipt = new HashMap();
&& ipt.put("type", "g3");
&& ipt.put("brand", "six
jse.setBody("cc33& test".getBytes());
&& jse.setHeaders(ipt);
&& Gson gson = new Gson();
&& List events1 = new
ArrayList();
&& events1.add(jse);
out.writeBytes(gson.toJson(events1));
&& out.flush();
&& out.close();
&& //读取响应
&& BufferedReader reader = new
BufferedReader(new InputStreamReader(
connection.getInputStream()));
&& StringBuffer sb = new
StringBuffer("");
&& while ((lines =
reader.readLine()) != null) {
&&& lines = new
String(lines.getBytes(), "utf-8");
sb.append(lines);
&& System.out.println(sb);
&& reader.close();
&& // 断开连接
&& connection.disconnect();
& } catch (MalformedURLException e) {
&& // TODO Auto-generated catch
&& e.printStackTrace();
& } catch (UnsupportedEncodingException e) {
&& // TODO Auto-generated catch
&& e.printStackTrace();
& } catch (IOException e) {
&& // TODO Auto-generated catch
&& e.printStackTrace();
测试运行客户端显示发送成功,服务端正确接收数据
15/01/19 18:29:31 INFO sink.LoggerSink: Event: { headers:{brand=six
god, type=g3} body: 63 63 33 33 20 20 74 65 73
74&&&&&&&&&&&&&&&&&&
cc33& test }
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。3223人阅读
java kafka消息的发送与接收
&&&&&&&&&&&&&&
消息队列在java EE级开发是很常用到的工具之一,在众多消息队列当中,active mq与kafka相对比较受开发者的喜爱,那么kafka是怎样实现消息的发送与接收呢?这里我们的消息通过一个实体类对象来进行封装,前提是你服务器上已经搭建好kafka环境,整个代码如下:
&&&&&& 1、kafka工具类,实现了生产者消费者消息的封装,以及发送消息到kafka上和从kafka上获取消息。package com.starit.ipran.
import java.util.ArrayL
import java.util.A
import java.util.D
import java.util.HashM
import java.util.L
import java.util.M
import java.util.P
import org.apache.kafka.clients.consumer.C
import org.apache.kafka.clients.consumer.ConsumerR
import org.apache.kafka.clients.consumer.ConsumerR
import org.apache.kafka.clients.consumer.KafkaC
import org.apache.kafka.clients.producer.KafkaP
import org.apache.kafka.clients.producer.P
import org.apache.kafka.clients.producer.ProducerR
import org.slf4j.L
import org.slf4j.LoggerF
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONO
import com.starit.ipran.load.C
import com.starit.ipran.model.IpranA
public class KafkaUtils {
&& &private final static Logger LOGGER = LoggerFactory.getLogger(KafkaUtils.class);
&& &private static Producer&String, String&
&& &private static Consumer&String, String&
&& &private KafkaUtils() {
&& & * 生产者,注意kafka生产者不能够从代码上生成主题,只有在服务器上用命令生成
&& &static {
&& &&& &Properties props = new Properties();
&& &&& &props.put(&bootstrap.servers&, Constants.BOOTSTRAP_SERVERS);//服务器ip:端口号,集群用逗号分隔
&& &&& &props.put(&acks&, &all&);
&& &&& &props.put(&retries&, 0);
&& &&& &props.put(&batch.size&, 16384);
&& &&& &props.put(&linger.ms&, 1);
&& &&& &props.put(&buffer.memory&, );
&& &&& &props.put(&key.serializer&, &org.apache.kafka.common.serialization.StringSerializer&);
&& &&& &props.put(&value.serializer&, &org.apache.kafka.common.serialization.StringSerializer&);
&& &&& &producer = new KafkaProducer&&(props);
&& & * 消费者
&& &static {
&& &&& &Properties props = new Properties();
&& &&& &props.put(&bootstrap.servers&, Constants.BOOTSTRAP_SERVERS);//服务器ip:端口号,集群用逗号分隔
&& &&& &props.put(&group.id&, &test&);
&& &&& &props.put(&enable.auto.commit&, &true&);
&& &&& &props.put(&auto.commit.interval.ms&, &1000&);
&& &&& &props.put(&session.timeout.ms&, &30000&);
&& &&& &props.put(&key.deserializer&, &org.apache.kafka.common.serialization.StringDeserializer&);
&& &&& &props.put(&value.deserializer&, &org.apache.kafka.common.serialization.StringDeserializer&);
&& &&& &consumer = new KafkaConsumer&&(props);
&& &&& &consumer.subscribe(Arrays.asList(Constants.TOPIC_NAME));
&& & * 发送对象消息 至kafka上,调用json转化为json字符串,应为kafka存储的是String。
&& & * @param msg
&& &public static void sendMsgToKafka(IpranAlarm msg) {
&& &&& &producer.send(new ProducerRecord&String, String&(Constants.TOPIC_NAME, String.valueOf(new Date().getTime()),
&& &&& &&& &&& &JSON.toJSONString(msg)));
&& & * 从kafka上接收对象消息,将json字符串转化为对象,便于获取消息的时候可以使用get方法获取。
&& &public static void getMsgFromKafka(){
&& &&& &while(true){
&& &&& &&& &ConsumerRecords&String, String& records = KafkaUtils.getKafkaConsumer().poll(100);
&& &&& &&& &if (records.count() & 0) {
&& &&& &&& &&& &for (ConsumerRecord&String, String& record : records) {
&& &&& &&& &&& &&& &JSONObject jsonAlarmMsg = JSON.parseObject(record.value());
&& &&& &&& &&& &&& &IpranAlarm alarmMsg = JSONObject.toJavaObject(jsonAlarmMsg, IpranAlarm.class);
&& &&& &&& &&& &&& &LOGGER.info(&从kafka接收到的消息是:& + alarmMsg.toString());
&& &&& &&& &&& &}
&& &&& &&& &}
&& &public static Consumer&String, String& getKafkaConsumer() {
&& &public static void closeKafkaProducer() {
&& &&& &producer.close();
&& &public static void closeKafkaConsumer() {
&& &&& &consumer.close();
&&&&&& 2、实体类是IpranAlarm,可以用其对象封装消息,调用发送与接收方法就可以实现相应的功能了。
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:39977次
积分:1044
积分:1044
排名:千里之外
原创:62篇
评论:10条
(7)(3)(13)(3)(4)(17)(6)(1)(2)(6)

我要回帖

更多关于 kafka json格式消息 的文章

 

随机推荐